【问题标题】:kafka new producer is not able to update metadata after one of the broker is downkafka 新生产者在其中一个代理关闭后无法更新元数据
【发布时间】:2016-06-12 19:19:18
【问题描述】:

我有一个 kafka 环境,其中有 2 个经纪人和 1 个动物园管理员。

当我尝试向 kafka 生成消息时,如果我停止代理 1(它是领导者),客户端将停止生成消息并给我以下错误,尽管代理 2 被选为该主题的新领导者并且分区。

org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。

10 分钟后,由于代理 2 是新的领导者,我希望生产者向代理 2 发送数据,但它继续失败并给出上述异常。 lastRefreshMs 和 lastSuccessfullRefreshMs 仍然相同,但生产者的 metadataExpireMs 为 300000。

我在生产者端使用 kafka 新的生产者实现。

似乎在启动生产者时,它会绑定到一个代理,如果该代理发生故障,它甚至不会尝试连接到集群中的另一个代理。

但我的期望是,如果一个代理出现故障,它应该直接检查其他可用代理的元数据并将数据发送给他们。

顺便说一句,我的主题是 4 个分区,复制因子为 2。提供此信息以防万一。

配置参数。

{request.timeout.ms=30000, retry.backoff.ms=100, buffer.memory=33554432, ssl.truststore.password=null, batch.size=16384, ssl.keymanager.algorithm=SunX509, receive.buffer.bytes=32768, ssl.cipher.suites=null, ssl.key.password=null, sasl.kerberos.ticket.renew.jitter=0.05, ssl.provider=null, sasl.kerberos.service.name=null, max.in.flight.requests.per.connection=5, sasl.kerberos.ticket.renew.window.factor=0.8, bootstrap.servers=[10.201.83.166:9500, 10.201.83.167:9500], client.id=rest-interface, max.request.size=1048576, acks=1, linger.ms=0, sasl.kerberos.kinit.cmd=/usr/bin/kinit, ssl.enabled.protocols=[TLSv1.2, TLSv1.1, TLSv1], metadata.fetch.timeout.ms=60000, ssl.endpoint.identification.algorithm=null, ssl.keystore.location=null, value.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, ssl.truststore.location=null, ssl.keystore.password=null, key.serializer=class org.apache.kafka.common.serialization.ByteArraySerializer, block.on.buffer.full=false, metrics.sample.window.ms=30000, metadata.max.age.ms=300000, security.protocol=PLAINTEXT, ssl.protocol=TLS, sasl.kerberos.min.time.before.relogin=60000, timeout.ms=30000, connections.max.idle.ms=540000, ssl.trustmanager.algorithm=PKIX, metric.reporters=[], compression.type=none, ssl.truststore.type=JKS, max.block.ms=60000, retries=0, send.buffer.bytes=131072, partitioner.class=class org.apache.kafka.clients.producer.internals.DefaultPartitioner, reconnect.backoff.ms=50, metrics.num.samples=2, ssl.keystore.type=JKS}

用例:

1-启动BR1和BR2产生数据(Leader是BR1)

2- 停止 BR2 生成数据(精细)

3- 停止BR1(这意味着此时集群中没有活跃的工作代理)然后启动BR2并产生数据(虽然领导者是BR2但失败)

4- 开始BR1生产数据(leader仍然是BR2,但生产数据很好)

5- 停止 BR2(现在 BR1 是领导者)

6-停止BR1(BR1仍然是领导者)

7-启动BR1产生数据(消息再次产生正常)

如果生产者向 BR1 发送了最新的成功数据,然后所有的 broker 都挂了,生产者希望 BR1 重新起床,尽管 BR2 已经起床并且是新的领导者。这是预期的行为吗?

【问题讨论】:

  • 您可以发布您在生产者中使用的配置吗?
  • 我所有的生产者配置都是默认的 kafka 生产者配置。
  • 由于缺乏提供的配置,我只能给出一个有根据的猜测:您只在属性中列出了一个代理:metadata.broker.list
  • 您提供的 @TobiSH 配置是旧的生产者配置。我正在使用 bootstrap.servers 并在那里写了我的两个代理主机。
  • 在我的帖子中查找配置参数

标签: apache-kafka kafka-producer-api


【解决方案1】:

在最新的 kafka 版本中,当一个代理关闭并且有一个由生产者使用的领导分区时。生产者将重试直到捕获可重试异常,然后生产者需要更新元数据。可以从 minimumLoadNode 获取新的元数据。所以新的领导者将被更新,生产者可以在那里写。

【讨论】:

    【解决方案2】:

    您需要增加重试次数。 在您的情况下,您需要将其设置为 >=5。

    这是您的生产者知道您的集群有新领导者的唯一方法。

    除此之外,请确保您的所有代理都拥有您的分区的副本。否则你不会得到一个新的领导者。

    【讨论】:

      【解决方案3】:

      花了几个小时后,我弄清楚了卡夫卡在我的情况下的行为。可能这是一个错误,或者可能需要以这种方式完成,原因在于幕后但实际上如果我会这样做,我不会这样做:)

      当所有的broker都宕机时,如果你只能启动一个broker,那么这个broker一定是最后宕机的broker才能成功产生消息。

      假设您有 5 个经纪人; BR1、BR2、BR3、BR4 和 BR5。如果一切都崩溃了,如果最后死掉的broker是BR3(它是最后一个leader),尽管你启动了所有的broker BR1、BR2、BR4和BR5,除非你启动BR3,否则它没有任何意义。

      【讨论】:

      • 我相信行为取决于unclean.leader.election.enablemin.insync.replicas 的代理设置。如果允许不干净的领导者选举,那么其他代理之一可以成为新的领导者,即使最后一个代理仍然关闭,您也应该能够发布。请参阅此演示文稿以了解所有权衡slideshare.net/gwenshap/…
      • 嗨,尽管我尝试了@HansJespersen 的建议,但我遇到的情况和你一样。你找到解决办法了吗?或者这就是卡夫卡的工作原理?
      • @John Hans 是对的,但这次你要权衡一致性和可用性。你确定你正确地传递了汉斯提到的参数。此参数名称也可能因不同的 kafka 版本而异。为您的版本寻找正确的版本。
      • 测试配置为:3 个代理 1 个主题 1 个分区 3 个复制因子。 unclean.leader.election.enable=true 和 min.insync.replicas 1. 一对一关闭代理。使用 Kafka 2.12-2.1.1。不工作。仍在尝试连接最新的连接代理。这可能是什么原因造成的?
      • @John 在生产者端有一个名为 metadata.max.age.ms 的配置参数。将此值设置得尽可能低,以检测新选择的领导者。
      猜你喜欢
      • 2016-10-03
      • 2016-02-13
      • 1970-01-01
      • 2018-04-06
      • 2019-03-18
      • 2018-09-15
      • 1970-01-01
      • 1970-01-01
      • 2016-03-19
      相关资源
      最近更新 更多