【问题标题】:Kafka streams 1.0: processing timeout with high max.poll.interval.ms and session.timeout.msKafka 流 1.0:处理超时,max.poll.interval.ms 和 session.timeout.ms 很高
【发布时间】:2018-09-30 13:53:34
【问题描述】:

我正在使用 Kafka 流 1.0 和 kafka 代理 1.0.1 的无状态处理器

问题是,CustomProcessor 每隔几秒就会关闭一次,这会导致重新平衡信号,我正在使用以下配置:

session.timeout.ms=15000

heartbeat.interval.ms=3000 // 设置为 1/3 session.timeout

max.poll.interval.ms=Integer.MAX_VALUE // 让它变得这么大,因为我正在执行密集的计算操作,处理 1 条 kafka 消息(NLP 操作)可能需要长达 10 分钟的时间

max.poll.records=1

尽管有这种配置以及我对 kafka 超时配置如何工作的理解,但我看到消费者每隔几秒就会重新平衡一次。

我已经阅读了以下文章和其他 stackoverflow 问题。关于如何调整长时间操作并避免非常长的会话超时,这将使故障检测到这么晚,但是我仍然看到意外的行为,除非我误解了什么。

KIP-62

Diff between session.timeout.ms and max.poll.interval

Kafka kstreams processing timeout

对于消费者环境设置,我有 8 台机器,每台 16 个代码,并使用 1 个主题和 100 个分区,我正在遵循这个 confluent doc here 推荐的做法。

任何指针?

【问题讨论】:

  • 再平衡的根本原因是什么?你能澄清关闭处理器和重新平衡之间的依赖关系吗?似乎,一个首先发生(根本原因)触发另一个。你检查过日志吗?
  • 当我看到 processor.close() 时,我立即看到几秒钟的暂停(大致是 session.timeout.ms duration),我看到重新平衡发生了,应用程序日志很好,没有异常或任何会退出process() 方法不干净
  • 据我了解@MatthiasJ.Sax 不应在应用程序的正常流程中调用 close() ,换句话说,不会在每条消息中调用 .close() ,例如init() 方法,对吗?
  • 是的,init() 和 close() 只应在分配/撤销分区时调用。如果抛出异常并且任务失败,也可能会调用 close()。因此,您描述的案例实际发生了什么让我感到困惑。如果session.timeout.ms 命中,则在承载消费者组的组协调器的代理上应该有一个日志条目。也许 DEBUG 日志会显示更多信息?
  • 谢谢@MatthiasJ.Sax 我将尝试检查代理日志并启用调试日志并发布更新。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

我想通了。经过大量调试并为 kafka 流客户端和代理启用详细日志记录后,结果是两件事:

  1. 流 1.0.0 (HERE) 中存在严重错误,因此我将客户端版本从 1.0.0 升级到 1.0.1
  2. 我将消费者属性default.deserialization.exception.handler 的值从org.apache.kafka.streams.errors.LogAndFailExceptionHandler 更新为org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

经过上述 2 次更改后,一切都非常完美,没有重启,我正在使用 grafana 监控重启,过去 48 小时内,没有发生单次重启。

我可能会进行更多故障排除,以确保上述 2 项中的哪一项能够真正解决,但我急于部署到生产环境,所以如果有人有兴趣从那里开始,请继续,否则,一旦我有时间会做进一步分析并更新答案!

很高兴能解决这个问题!!!

【讨论】:

    猜你喜欢
    • 2020-05-19
    • 2017-10-14
    • 2017-02-05
    • 2020-02-18
    • 1970-01-01
    • 2022-01-11
    • 2017-01-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多