【问题标题】:Kafka Consumer keeps rebalancing卡夫卡消费者不断重新平衡
【发布时间】:2022-08-08 13:06:55
【问题描述】:

即使没有处理任何源记录,我的 Kafka Consumer 客户端也会在频繁轮询之间保持重新平衡。

此外,我在正确的位置实现了consumer.pause()consumer.resume(),以阻止消费者轮询,直到使用后端API 隔离处理轮询的消费者记录数。不知道是什么导致了这个问题。

消费者日志

2022-07-21 10:27:08; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Group coordinator brokerHost:9093 (id: 2147444144 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted.\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)\"

2022-07-21 10:27:25; LOG_LEVEL=\"ERROR\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Offset commit failed on partition MySourceTopic-0 at offset 38572: The coordinator is not aware of this member.\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] OffsetCommit failed with Generation{generationId=18661, memberId=\'MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6\', protocol=\'range\'}: The coordinator is not aware of this member.\"

2022-07-21 10:27:25; LOG_LEVEL=\"WARN\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Asynchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=\'\'}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=\'\'}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=\'\'}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=\'\'}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Failing OffsetCommit request since the consumer is not part of an active group\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Attempt to heartbeat with stale Generation{generationId=18661, memberId=\'MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6\', protocol=\'range\'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error\"

2022-07-21 10:27:25; LOG_LEVEL=\"WARN\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Synchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=\'\'}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=\'\'}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=\'\'}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=\'\'}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Lost previously assigned partitions MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3\"

2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully joined group with generation Generation{generationId=18663, memberId=\'MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026\', protocol=\'range\'}\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully synced group in generation Generation{generationId=18663, memberId=\'MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026\', protocol=\'range\'}\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Notifying assignor about the new Assignment(partitions=[MySourceTopic-0, MySourceTopic-1, MySourceTopic-2, MySourceTopic-3])\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Adding newly assigned partitions: MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-0 to the committed offset FetchPosition{offset=38572, offsetEpoch=Optional[132], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 33708 rack: null)], epoch=132}}\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-2 to the committed offset FetchPosition{offset=38566, offsetEpoch=Optional[122], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 39217 rack: null)], epoch=122}}\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-1 to the committed offset FetchPosition{offset=38779, offsetEpoch=Optional[118], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 30678 rack: null)], epoch=118}}\"

2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-3 to the committed offset FetchPosition{offset=38585, offsetEpoch=Optional[121], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 38300 rack: null)], epoch=121}}\"

2022-07-21 10:27:30; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"

2022-07-21 10:27:35; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"

消费者配置

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    enable.auto.commit = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    heartbeat.interval.ms = 3000
    max.poll.interval.ms = 300000
    max.poll.records = 100

消费者 poll() 实现

        while (true) {
            try {
                ConsumerRecords<String, GenericRecord> records = null;
                LOGGER.info(\"Polling...{}\");
                records = consumer.poll(Duration.ofSeconds(5));

                if (records != null && records.count() > 0) {
                    consumer.pause(consumer.assignment());
                    recordExecutor.execute(records);
                }

                consumer.resume(consumer.assignment());
                consumer.commitSync();

            } catch (SerializationException e) {
                continue;
            } catch (Exception e) {
                continue;
            }
        }

提前致谢。

  • 你能提到你正在使用的kafka客户端版本吗?

标签: apache-kafka kafka-consumer-api


【解决方案1】:

这里有几点我想提一下。

  1. 理想情况下,您不应将consumer.pause()consumer.resume() 用于您的用例。如果没有记录,那么如果没有可用记录,consumer.poll() 将在其参数中指定的持续时间内阻塞。因此,它不应导致任何高 CPU 使用率,因为它不是非阻塞调用。

    来自consumer.poll() 的文档

    如果有可用记录,此方法会立即返回。 否则,它将等待通过的超时。如果超时, 将返回一个空记录集。

    1. max.poll.interval.ms 与您应该在 consumer.poll() 中传递的不同。如果两者相同,consumer.poll() 首先不会有这个参数。 max.poll.interval.ms 是最大值。轮询调用之间的延迟。

      所以,当没有记录时,你就等到max.poll.interval.ms,到下一个poll() 打电话的时候。由于下一次 poll() 未在此超时时间内进行,因此消费者被视为失败。

    来自max.poll.interval.ms 的文档:

    如果在此超时到期之前未调用 poll(),则 消费者被认为是失败的,并且小组将重新平衡为了 将分区重新分配给另一个成员

    1. 此外,您似乎正在使用commitSync(),而您的配置是enable.auto.commit=true。理想情况下,您应该使用其中任何一个。

    2. 如果您不想抛出异常,请在 catch 块中记录异常。

【讨论】:

  • 感谢您的回复,我已将我的问题编辑为 records = consumer.poll(Duration.ofSeconds(5)。之前我给出了错误的信息。所以基本上,在 poll() 里面我给出了 5 秒和 max.poll.interval .ms = 300000(5 分钟)。所以基本上,重新平衡不应该发生,因为你可以看到我的消费者日志它每 5 秒轮询一次。但同时重新平衡每五分钟发生一次,这就是这里的问题。
  • @ChristDist 检查组中是否有其他消费者导致重新平衡。
【解决方案2】:

经过广泛的分析,我们确定了这些频繁重新平衡的根本原因,这是由于消费者/流媒体和 Group Coordinator(Leader Broker)之间的 session.timeout 造成的。由于托管这些消费者/流媒体的服务器容量问题(100% CPU 利用率),session.timeout 一直在发生。当 CPU 饱和到 100% 时,所有这些 KAFKA 客户端都进入一个没反应状态,最终它的会话超时。

事件簿

group-MyKAFKAConsumeGroupId1] Group coordinator brokerHost:9093 (id: 21474 rack: null) 不可用或无效,原因如下:会话超时没有收到心跳响应。isDisconnected:false。将尝试重新发现。

【讨论】:

    猜你喜欢
    • 2019-10-31
    • 1970-01-01
    • 2018-12-21
    • 1970-01-01
    • 2019-01-08
    • 2019-09-19
    • 2016-12-17
    • 1970-01-01
    • 2019-07-03
    相关资源
    最近更新 更多