【发布时间】:2018-11-23 09:43:06
【问题描述】:
我正在尝试使用 Spring-Kafka (1.1.0.RELEASE) 找出在 Kafka 消费者中手动提交偏移量的方法。我知道,最好为稳健的客户端实现提交这些偏移量,这样其他消费者就不会处理重复事件,这些事件可能最初由现已死亡的消费者处理,或者因为触发了重新平衡。
我看到有两种方法可以解决这个问题 -
-
将 ACK_MODE 设置为 MANUAL_IMMEDIATE 并在侦听器实现中调用 ack.acknowledge() API
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); @KafkaListener(topics = "anotherBatchTopic", containerFactory = "batchContainerFactory") public void listen(List<ConsumerRecord<byte[], String>> batch, Acknowledgment acknowledgment) throws Exception { logger.info("===*** batch size : " + batch.size() + "***==="); batch.forEach(System.out::println); acknowledgment.acknowledge(); } -
实现一个 ConsumerRebalanceListener。
ConcurrentKafkaListenerContainerFactory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(final Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(final Collection<TopicPartition> collection) { } });
但是,使用这种方法,我不知道如何获取对消费者的引用,以调用 consumer.commitSync() 或 consumer.commitASync() API。由于一些技术限制,我无法移动到支持ConsumerAwareRebalanceListener 并参考Consumer 的最新版本的spring-kafka。
那么如何利用ConsumerRebalanceListener 向Kafka 提交偏移量?
另外,我正在使用ConcurrentKafkaListenerContainerFactory.setConcurrency() API 启动多个使用者侦听器线程,因此如果特定使用者线程死亡,它是否有自己的 ConsumerRebalanceListener 实例?
【问题讨论】:
-
尽量避免使用内部 Kafka 提交。我会选择使用外部偏移处理,这样更安全。例如,我使用 Zookeeper。在这篇 Cloudera 文章中查看更多详细信息:blog.cloudera.com/blog/2017/06/…
标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka