【问题标题】:Ways to manually commit offset in kafka consumers utilizing spring kafka使用 spring kafka 在 kafka 消费者中手动提交偏移的方法
【发布时间】:2018-11-23 09:43:06
【问题描述】:

我正在尝试使用 Spring-Kafka (1.1.0.RELEASE) 找出在 Kafka 消费者中手动提交偏移量的方法。我知道,最好为稳健的客户端实现提交这些偏移量,这样其他消费者就不会处理重复事件,这些事件可能最初由现已死亡的消费者处理,或者因为触发了重新平衡。

我看到有两种方法可以解决这个问题 -

  • 将 ACK_MODE 设置为 MANUAL_IMMEDIATE 并在侦听器实现中调用 ack.acknowledge() API

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
    @KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory")
    public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception {
        logger.info("===*** batch size : " + batch.size() + "***===");
        batch.forEach(System.out::println);
        acknowledgment.acknowledge();
    }
    
  • 实现一个 ConsumerRebalanceListener。

    ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    
    @Override
    public void onPartitionsRevoked(final Collection<TopicPartition> collection) {
    
    }
    
    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> collection) {
    
    }
    });
    

但是,使用这种方法,我不知道如何获取对消费者的引用,以调用 consumer.commitSync()consumer.commitASync() API。由于一些技术限制,我无法移动到支持ConsumerAwareRebalanceListener 并参考Consumer 的最新版本的spring-kafka。

那么如何利用ConsumerRebalanceListener 向Kafka 提交偏移量?

另外,我正在使用ConcurrentKafkaListenerContainerFactory.setConcurrency() API 启动多个使用者侦听器线程,因此如果特定使用者线程死亡,它是否有自己的 ConsumerRebalanceListener 实例?

【问题讨论】:

  • 尽量避免使用内部 Kafka 提交。我会选择使用外部偏移处理,这样更安全。例如,我使用 Zookeeper。在这篇 Cloudera 文章中查看更多详细信息:blog.cloudera.com/blog/2017/06/…

标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

消费者再平衡监听器不能用于提交偏移量;与 1.x;唯一的方法是通过Acknowledgment 参数。

1.1.0 已经很老了;建议所有 1.x 用户至少升级到 1.3.5;由于 KIP-62,它有一个更简单的线程模型 - 请参阅 the project page

当前版本 2.1.6(2.1.7 将于今天发布)有更多选项,包括ConsumerAwareMessageListener,您可以在其中获得对消费者的完全访问权限。

您不能从版本

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-08-22
  • 2021-11-10
  • 2022-11-13
  • 2020-08-08
  • 2018-12-17
  • 2022-11-11
  • 2023-03-29
相关资源
最近更新 更多