【发布时间】:2016-12-17 22:04:42
【问题描述】:
如果 kafka(0.10 版)消费者尝试重新加入消费者组,它的默认行为是什么。 我正在为一个消费者组使用一个消费者,但它似乎在重新加入时受到打击。 每 10 分钟后,它会在消费者日志中打印以下行。
2016-08-11 13:54:53,803 INFO o.a.k.c.c.i.ConsumerCoordinator [pool-5-thread-1] ****撤销以前分配的分区**** [] 为组 image-consumer-group
2016-08-11 13:54:53,803 INFO OakcciAbstractCoordinator [pool-5-thread-1] (重新)加入群组 image-consumer-group
2016-08-11 14:04:53,992 INFO o.a.k.c.c.i.AbstractCoordinator [pool-5-thread-1] 为组 image-consumer-group 标记协调器死亡
2016-08-11 14:04:54,095 INFO o.a.k.c.c.i.AbstractCoordinator [pool-5-thread-1] 发现组 image-consumer-group 的协调器。
2016-08-11 14:04:54,096 INFO OakcciAbstractCoordinator [pool-5-thread-1] (重新)加入群组 image-consumer-group
重启消费者应用程序没有帮助。
【问题讨论】:
-
在我将 session.timeout.ms 设置为一个非常大的值以允许完成复杂的长时间运行管道后,我面临着完全相同的问题。你找到解决办法了吗?
-
是的,1.您必须在获取一定数量的记录后对消费者调用暂停。 2.然后创建一个后台线程,通过循环调用poll(0)重复向kafka发送心跳。同时,您的主要消费者线程正在处理。 3. 处理后告诉后台线程停止主消费者线程 4. 现在您可以在主题分区上调用 resume。
-
@BharatBhagat。即使在添加 session.timeout 之后我也遇到了同样的问题。你能告诉我你是如何添加你在 Kafka Stream 中提到的逻辑的吗?非常感谢!
-
@BharatBhagat 如果消费者线程真正失败(网络中断或处理消息时出现异常),消费者将无法停止心跳线程。并且代理将继续接收心跳,并且永远不会启动重新平衡。
-
对不起,伙计们,我不再从事那个项目了。所以没有太多信息。您可以尝试 Kafka Dev 电子邮件组或发布另一个问题。
标签: apache-kafka kafka-consumer-api