【发布时间】:2021-01-28 01:43:46
【问题描述】:
我使用 confluent-kafka v1.3.0,但我遇到了消费者组会话超时的以下问题。 我的配置如下:
c['KAFKA'] = {
'bootstrap.servers': 'host.docker.internal:9104',
'consumer': {
'group.id': 'consumer',
'enable.auto.commit': True,
'default.topic.config': {
'auto.offset.reset': 'earliest
},
'heartbeat.interval.ms': 100000,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 100000
},
}
代码中的逻辑如下:
consumer.subscribe('database_changes')
with ThreadPoolExecutor(max_workers=500) as executor:
while True:
msg = consumer.poll(100)
if msg is not None:
executor.submit(process_message, msg)
函数处理消息中的代码等待几毫秒,因为它是非常简单的逻辑。一切正常,但每时每刻我都收到此错误:
{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}
这些重新平衡极大地阻碍了整个过程。
有没有人知道什么是错误的设置?我怀疑心跳不正常,但我不知道它如何验证或更好地修复。
谢谢
【问题讨论】:
-
只是为了分享信息,我使用 heart-beat 和 session.timeout (6000) 的默认设置确实有同样的问题,我使用的是 confluent-kafka 1.4.2。
-
你能解决这个问题吗?
标签: python apache-kafka kafka-consumer-api producer-consumer confluent-platform