【发布时间】:2022-08-08 13:06:55
【问题描述】:
即使没有处理任何源记录,我的 Kafka Consumer 客户端也会在频繁轮询之间保持重新平衡。
此外,我在正确的位置实现了consumer.pause() 和consumer.resume(),以阻止消费者轮询,直到使用后端API 隔离处理轮询的消费者记录数。不知道是什么导致了这个问题。
消费者日志
2022-07-21 10:27:08; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Group coordinator brokerHost:9093 (id: 2147444144 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response.isDisconnected: false. Rediscovery will be attempted.\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Discovered group coordinator brokerHost:9093 (id: 2147444144 rack: null)\"
2022-07-21 10:27:25; LOG_LEVEL=\"ERROR\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Offset commit failed on partition MySourceTopic-0 at offset 38572: The coordinator is not aware of this member.\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] OffsetCommit failed with Generation{generationId=18661, memberId=\'MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6\', protocol=\'range\'}: The coordinator is not aware of this member.\"
2022-07-21 10:27:25; LOG_LEVEL=\"WARN\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Asynchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=\'\'}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=\'\'}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=\'\'}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=\'\'}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Failing OffsetCommit request since the consumer is not part of an active group\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Attempt to heartbeat with stale Generation{generationId=18661, memberId=\'MyKAFKAConsumerClientId1-d410deb4-d6f9-4f7e-b789-f866b89817e6\', protocol=\'range\'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error\"
2022-07-21 10:27:25; LOG_LEVEL=\"WARN\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Synchronous auto-commit of offsets {MySourceTopic-0=OffsetAndMetadata{offset=38572, leaderEpoch=132, metadata=\'\'}, MySourceTopic-2=OffsetAndMetadata{offset=38566, leaderEpoch=122, metadata=\'\'}, MySourceTopic-1=OffsetAndMetadata{offset=38779, leaderEpoch=118, metadata=\'\'}, MySourceTopic-3=OffsetAndMetadata{offset=38585, leaderEpoch=121, metadata=\'\'}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Lost previously assigned partitions MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group\"
2022-07-21 10:27:25; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] (Re-)joining group\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully joined group with generation Generation{generationId=18663, memberId=\'MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026\', protocol=\'range\'}\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.AbstractCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Successfully synced group in generation Generation{generationId=18663, memberId=\'MyKAFKAConsumerClientId1-f55c7204-c932-40cd-a9b3-0351f9904026\', protocol=\'range\'}\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Notifying assignor about the new Assignment(partitions=[MySourceTopic-0, MySourceTopic-1, MySourceTopic-2, MySourceTopic-3])\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Adding newly assigned partitions: MySourceTopic-0, MySourceTopic-2, MySourceTopic-1, MySourceTopic-3\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-0 to the committed offset FetchPosition{offset=38572, offsetEpoch=Optional[132], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 33708 rack: null)], epoch=132}}\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-2 to the committed offset FetchPosition{offset=38566, offsetEpoch=Optional[122], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 39217 rack: null)], epoch=122}}\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-1 to the committed offset FetchPosition{offset=38779, offsetEpoch=Optional[118], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 30678 rack: null)], epoch=118}}\"
2022-07-21 10:27:26; LOG_LEVEL=\"INFO\"; SOURCE=\"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"[Consumer clientId=MyKAFKAConsumerClientId1, groupId=MyKAFKAConsumeGroupId1] Setting offset for partition MySourceTopic-3 to the committed offset FetchPosition{offset=38585, offsetEpoch=Optional[121], currentLeader=LeaderAndEpoch{leader=Optional[brokerHost:9093 (id: 38300 rack: null)], epoch=121}}\"
2022-07-21 10:27:30; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"
2022-07-21 10:27:35; LOG_LEVEL=\"INFO\"; SOURCE=\"MyKAFKAConsumer\"; PLATFORM=\"Retail_Products\"; SERVICE=\"MyKAFKAConsumer\"; EVENT_MESSAGE=\"**Polling...**\"
消费者配置
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
enable.auto.commit = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
max.poll.interval.ms = 300000
max.poll.records = 100
消费者 poll() 实现
while (true) {
try {
ConsumerRecords<String, GenericRecord> records = null;
LOGGER.info(\"Polling...{}\");
records = consumer.poll(Duration.ofSeconds(5));
if (records != null && records.count() > 0) {
consumer.pause(consumer.assignment());
recordExecutor.execute(records);
}
consumer.resume(consumer.assignment());
consumer.commitSync();
} catch (SerializationException e) {
continue;
} catch (Exception e) {
continue;
}
}
提前致谢。
-
你能提到你正在使用的kafka客户端版本吗?
标签: apache-kafka kafka-consumer-api