【问题标题】:Kafka python Consumer group session timed outKafka python消费者组会话超时
【发布时间】:2021-01-28 01:43:46
【问题描述】:

我使用 confluent-kafka v1.3.0,但我遇到了消费者组会话超时的以下问题。 我的配置如下:

c['KAFKA'] = {
    'bootstrap.servers': 'host.docker.internal:9104',
    'consumer': {
        'group.id': 'consumer',
        'enable.auto.commit': True,
        'default.topic.config': {
            'auto.offset.reset': 'earliest
        },
        'heartbeat.interval.ms': 100000,
        'max.poll.interval.ms': 300000,
        'session.timeout.ms': 100000
    },
}

代码中的逻辑如下:

consumer.subscribe('database_changes')

with ThreadPoolExecutor(max_workers=500) as executor:
    while True:
        msg = consumer.poll(100)
        if msg is not None:
            executor.submit(process_message, msg)

函数处理消息中的代码等待几毫秒,因为它是非常简单的逻辑。一切正常,但每时每刻我都收到此错误:

{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}

这些重新平衡极大地阻碍了整个过程。

有没有人知道什么是错误的设置?我怀疑心跳不正常,但我不知道它如何验证或更好地修复。

谢谢

【问题讨论】:

  • 只是为了分享信息,我使用 heart-beat 和 session.timeout (6000) 的默认设置确实有同样的问题,我使用的是 confluent-kafka 1.4.2。
  • 你能解决这个问题吗?

标签: python apache-kafka kafka-consumer-api producer-consumer confluent-platform


【解决方案1】:

根据 confluent kafka 文档,heartbeat.interval.ms 应设置为不高于 session.timeout.ms 的 1/3。由于在 session.timeout.ms 值之后,消费者被假定为死亡,建议等待至少三个心跳来假定它。

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#heartbeat.interval.ms

【讨论】:

    猜你喜欢
    • 2015-02-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-01
    • 1970-01-01
    • 2020-09-19
    • 2017-01-04
    • 1970-01-01
    相关资源
    最近更新 更多