【发布时间】:2018-09-30 13:53:34
【问题描述】:
我正在使用 Kafka 流 1.0 和 kafka 代理 1.0.1 的无状态处理器
问题是,CustomProcessor 每隔几秒就会关闭一次,这会导致重新平衡信号,我正在使用以下配置:
session.timeout.ms=15000
heartbeat.interval.ms=3000 // 设置为 1/3 session.timeout
max.poll.interval.ms=Integer.MAX_VALUE // 让它变得这么大,因为我正在执行密集的计算操作,处理 1 条 kafka 消息(NLP 操作)可能需要长达 10 分钟的时间
max.poll.records=1
尽管有这种配置以及我对 kafka 超时配置如何工作的理解,但我看到消费者每隔几秒就会重新平衡一次。
我已经阅读了以下文章和其他 stackoverflow 问题。关于如何调整长时间操作并避免非常长的会话超时,这将使故障检测到这么晚,但是我仍然看到意外的行为,除非我误解了什么。
Diff between session.timeout.ms and max.poll.interval
Kafka kstreams processing timeout
对于消费者环境设置,我有 8 台机器,每台 16 个代码,并使用 1 个主题和 100 个分区,我正在遵循这个 confluent doc here 推荐的做法。
任何指针?
【问题讨论】:
-
再平衡的根本原因是什么?你能澄清关闭处理器和重新平衡之间的依赖关系吗?似乎,一个首先发生(根本原因)触发另一个。你检查过日志吗?
-
当我看到
processor.close()时,我立即看到几秒钟的暂停(大致是session.timeout.ms duration),我看到重新平衡发生了,应用程序日志很好,没有异常或任何会退出process() 方法不干净 -
据我了解@MatthiasJ.Sax 不应在应用程序的正常流程中调用 close() ,换句话说,不会在每条消息中调用 .close() ,例如init() 方法,对吗?
-
是的,init() 和 close() 只应在分配/撤销分区时调用。如果抛出异常并且任务失败,也可能会调用 close()。因此,您描述的案例实际发生了什么让我感到困惑。如果
session.timeout.ms命中,则在承载消费者组的组协调器的代理上应该有一个日志条目。也许 DEBUG 日志会显示更多信息? -
谢谢@MatthiasJ.Sax 我将尝试检查代理日志并启用调试日志并发布更新。
标签: apache-kafka apache-kafka-streams