【发布时间】:2016-06-12 19:19:18
【问题描述】:
我有一个 kafka 环境,其中有 2 个经纪人和 1 个动物园管理员。
当我尝试向 kafka 生成消息时,如果我停止代理 1(它是领导者),客户端将停止生成消息并给我以下错误,尽管代理 2 被选为该主题的新领导者并且分区。
org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。
10 分钟后,由于代理 2 是新的领导者,我希望生产者向代理 2 发送数据,但它继续失败并给出上述异常。 lastRefreshMs 和 lastSuccessfullRefreshMs 仍然相同,但生产者的 metadataExpireMs 为 300000。
我在生产者端使用 kafka 新的生产者实现。
似乎在启动生产者时,它会绑定到一个代理,如果该代理发生故障,它甚至不会尝试连接到集群中的另一个代理。
但我的期望是,如果一个代理出现故障,它应该直接检查其他可用代理的元数据并将数据发送给他们。
顺便说一句,我的主题是 4 个分区,复制因子为 2。提供此信息以防万一。
配置参数。
{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}
用例:
1-启动BR1和BR2产生数据(Leader是BR1)
2- 停止 BR2 生成数据(精细)
3- 停止BR1(这意味着此时集群中没有活跃的工作代理)然后启动BR2并产生数据(虽然领导者是BR2但失败)
4- 开始BR1生产数据(leader仍然是BR2,但生产数据很好)
5- 停止 BR2(现在 BR1 是领导者)
6-停止BR1(BR1仍然是领导者)
7-启动BR1产生数据(消息再次产生正常)
如果生产者向 BR1 发送了最新的成功数据,然后所有的 broker 都挂了,生产者希望 BR1 重新起床,尽管 BR2 已经起床并且是新的领导者。这是预期的行为吗?
【问题讨论】:
-
您可以发布您在生产者中使用的配置吗?
-
我所有的生产者配置都是默认的 kafka 生产者配置。
-
由于缺乏提供的配置,我只能给出一个有根据的猜测:您只在属性中列出了一个代理:metadata.broker.list
-
您提供的 @TobiSH 配置是旧的生产者配置。我正在使用 bootstrap.servers 并在那里写了我的两个代理主机。
-
在我的帖子中查找配置参数
标签: apache-kafka kafka-producer-api