【问题标题】:Kafka - CommitFailedException - No timeout poll(max.poll.interval.ms)Kafka - CommitFailedException - 没有超时轮询(max.poll.interval.ms)
【发布时间】:2022-01-11 18:10:38
【问题描述】:

我有一个 Spring Kafka 应用程序,当消费者尝试提交偏移量时出现错误。这是我的 Kafka 消费者配置:

Kafka消费者:

@Bean
public ConsumerFactory<String, Item> consumerFactory() {
        log.info("Configuring Kafka Consumer properties - consumerFactory");
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "cpo-executor-groupid");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(Item.class, false));
}

卡夫卡管理员:

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic processTopic() {
    return TopicBuilder.name(topicName).partitions(2).build();
}

我知道如果进程从 kafka 花费的时间超过 max.poll.interval.ms 或 session.timeout.ms 就会发生这种情况,但我的情况不是这样。我的应用程序花费不到 1 秒的时间来消费和处理消息:

Time: 11:00:32.773 Configuring Kafka Consumer properties - consumerFactory
Time: 11:00:39.433  INFO [cpo-executor,,] 55630 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig
Time: 11:00:57.293 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator: 
[Consumer clientId=consumer-cpo-executor-groupid-1, groupId=cpo-executor-groupid] Offset commit failed on partition process-topic-1 at offset 95: 
The coordinator is not aware of this member.
Time: 11:00:57.299 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] c.c.cpoexecutor.config.KafkaErrHandler   : Error in process with Exception org.apache.kafka.clients.consumer.CommitFailedException: 
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. and the records are []

由于我没有更改任何 Kafka 配置并且 max.poll.interval.ms 的默认值为 5 分钟,发生了什么?

kakfa:2.13-2.8.0
春天卡夫卡:2.7.6
春天:2.4.2

【问题讨论】:

  • 同一个消费组中可能还有其他消费者。我的场景是这样的,消费者通过 commitAsync 获取消息并发送提交请求。在这个过程的中间,其他消费者加入或离开组并发生组再平衡。
  • 如何防止这种情况发生?你能提出你的解决方案吗? @J.Song
  • 您可以在主题和消费者组上使用ACL(访问控制列表)。如果您应用 ACL 功能,则只有允许的消费者才能加入消费者组并阅读主题。 ACL 是 apache kafka 的默认功能。参考官方文档。
  • 但就我而言,我不想使用用户来控制谁可以阅读或不阅读该主题。我正在测试从一个主题读取的过程,可能其他用户也在做同样的事情。至少现在我明白了这个问题,但我不知道如何解决:(。
  • 我认为,您应该完全使用consumer group

标签: spring-boot kafka-consumer-api spring-kafka java-11


【解决方案1】:

我已经根据环境更改了groupId名称,所以我不会在同一个kafka上有相同的groupId。

Kafka消费者:

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offSetReset;

@Bean
public ConsumerFactory<String, Item> consumerFactory() {
   log.info("Configuring Kafka Consumer properties - consumerFactory");
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSetReset);
   props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

应用程序-local.yml:

spring:
  kafka:
    bootstrap-servers: #####:9092
    consumer:
      auto-offset-reset: earliest
      group-id: cpo-executor-groupid-local

应用程序-dev.yml:

spring:
  kafka:
    bootstrap-servers: #####:9092
    consumer:
      auto-offset-reset: earliest
      group-id: cpo-executor-groupid-dev

应用程序-prd.yml

spring:
  kafka:
    consumer:
      bootstrap-servers: #####:9092
      auto-offset-reset: earliest
      group-id: cpo-executor-groupid-prd

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-09-30
    • 1970-01-01
    • 2020-07-10
    • 1970-01-01
    • 1970-01-01
    • 2022-10-15
    • 1970-01-01
    • 2019-10-25
    相关资源
    最近更新 更多