【问题标题】:Kafka consumer, very long rebalances卡夫卡消费者,很长的重新平衡
【发布时间】:2018-12-21 15:56:41
【问题描述】:

我们正在运行 3 个代理 Kafka 0.10.0.1 集群。我们有一个 java 应用程序,它产生了许多从不同主题消费的消费者线程。对于每个主题,我们都指定了不同的消费者组。

我经常看到,每当重新启动此应用程序时,一个或多个 CG 需要超过 5 分钟才能接收分区分配。在那之前,该主题的消费者不会消费任何东西。如果我去 Kafka 代理并运行 consumer-groups.sh 并描述那个特定的 CG,我会发现它正在重新平衡。 在 server.log 我看到这样的行

准备稳定组 otp-sms-consumer 稳定组otp-sms-consumer

这两个日志之间通常有大约 5 分钟或更长时间的间隔。 在消费者方面,当我打开跟踪级别日志时,在此暂停时间内实际上没有任何活动。几分钟后,大量活动开始。 该主题中存储了时间关键数据,例如 otp-sms,我们不能容忍如此长的延迟。如此长时间的重新平衡可能是什么原因。

这是我们的消费者配置

auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 300000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /x/x/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

请帮忙。

【问题讨论】:

  • 在您的应用程序中处理一条消息花费了多少时间?
  • 对于卡住的 CG 主题,每条消息 10 毫秒到 50 毫秒
  • 你可以尝试分配而不是订阅吗?

标签: apache-kafka


【解决方案1】:

您的消费者配置似乎合理。我建议尝试三件事:

  • 尝试生成单个消费者线程,并仅为其分配您尝试从中消费的主题之一。该单个线程应该为该主题分配所有分区,并且应该立即开始接收所有数据。您可以尝试打印出分区和消息偏移量以及内容,以验证它是否正在获取所有数据。
  • 一旦您验证它工作正常,就生成一个消费者线程,并将您尝试从中消费的主题分配给它所有。执行相同的验证打印出消息。
  • 最后,如果一切正常,开始一一添加消费者线程,看看消费时是否开始出现暂停。

这应该可以让您查明问题所在。如果您能够使用单个线程而不是多个线程来使用所有内容,那么您的线程机制/池可​​能存在问题。

【讨论】:

  • 我做不到。如果我减少线程数,输出将急剧下降。我的线程机制非常简单。我只是在创建一个 java 固定线程池并在其中添加 5 或 10 个线程。还有其他想法吗?
  • 这些天我面临着太多的问题。现在我们的一个消费者群体时不时地重新平衡。并且重新平衡的时间非常短,超过 5-10 分钟。即使在重新平衡之后,我也看到只有一半的消费者处于活动状态/接收分配。一切都乱套了
【解决方案2】:

检查磁盘上的__consumer_offsets 分区大小。由于压缩错误,我们遇到了类似的问题。这会导致非常长的重新平衡。 有关更多详细信息,请参阅https://issues.apache.org/jira/browse/KAFKA-5413(自 kafka 0.10.2.2 / 0.11 起已解决) 另一种选择是您的代理配置不正确,并且压缩已关闭,如果为 false,则 log.cleaner.enable__consumer_offsets 是一个 compacted 主题,所以如果 log.cleaner 被禁用,它将不会被压缩并导致相同的症状。

【讨论】:

  • 好的,我会检查一下。但是,我可以看到 log.cleaner.enable 属性默认为 true。我们还没有在我们的配置中修改它。
  • 可能是这个 bug 对你造成了打击……但彻底彻底也没什么不好
  • 我指的是issues.apache.org/jira/browse/KAFKA-5413中报告的错误(日志清理器的配置只是检查列表中的另一件事)。查看日志清理器的日志文件,或检查 __consumer_offsets 主题的磁盘大小。我很确定这是问题所在,因为它与您使用的版本匹配
【解决方案3】:

我怀疑您的集群版本至少为 0.10.1.0,因为我在 version 中引入的使用者配置中看到了 max.poll.interval.ms

Kafka 0.10.1.0 集成了KIP-62,它引入了重新平衡超时设置为max.poll.interval.ms,其默认值为5 分钟。

我猜如果您不想在重新平衡期间等待超时到期,您的消费者需要通过调用close() 方法干净地离开消费者组。

【讨论】:

    【解决方案4】:

    重新平衡超时等于max.poll.interval.ms(在您的情况下为 5 分钟)当重新平衡在一个组中开始时,Kafka 撤销该组中的所有消费者。然后等待所有活着的消费者(发送心跳的消费者)到 poll() 并发送 JoinGroupRequest。

    这个等待过程会以重新平衡超时或所有活着的消费者 poll() 和 Kafka 为这些消费者分配分区结束。

    因此,在您的情况下,您的一个消费者中可能有一个长时间运行的进程,Kafka 等待此进程完成以分配分区。

    有关更多信息,您可以查看以下内容:

    消费者群体是 Kafka 的重要机制。他们允许 消费者通过动态分配来共享负载和弹性扩展 主题的分区给消费者。在我们目前的模型中 消费者群体,每当重新平衡发生时,每个消费者都来自该群体 小组经历停机 - 他们的 poll() 调用阻塞,直到每个 组中的其他消费者调用 poll()。这是因为 每个消费者都需要在重新平衡场景中调用 JoinGroup 为了确认它还在群里。

    今天,如果客户端已将max.poll.interval.ms配置为大 value, group coordinator broker 将不限数量 的加入组​​请求和重新平衡因此可以继续 无限的时间。 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit)

    -

    由于我们给客户端最多 max.poll.interval.ms 来处理 批量记录,这也是消费者可以使用的最长时间 预计在最坏的情况下会重新加入该小组。因此我们 建议将Java客户端中的rebalance timeout设置为相同 使用 max.poll.interval.ms 配置的值。当重新平衡开始时, 后台线程将继续发送心跳。消费者 在处理完成并且用户之前不会重新加入组 调用 poll()。从协调者的角度来看,消费者将 在 1) 他们的会话超时之前不会从组中删除 没有收到心跳就过期了,或者 2) 重新平衡超时 过期了。

    (https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread)

    【讨论】:

      猜你喜欢
      • 2019-10-31
      • 2022-08-08
      • 1970-01-01
      • 1970-01-01
      • 2019-01-08
      • 2016-12-17
      • 2019-07-03
      • 2018-05-05
      • 2021-08-22
      相关资源
      最近更新 更多