【问题标题】:Consuming again messages from kafka log compaction topic再次消费来自 kafka 日志压缩主题的消息
【发布时间】:2020-03-05 06:39:27
【问题描述】:

我有一个使用 @KafkaListerner 注释的带有 Kafka 消费者的 Spring 应用程序。正在消费的主题是日志压缩的,我们可能会遇到必须再次消费主题消息的场景。以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置。

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:
        @KafkaListener(...)
        public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
            System.out.println(in);
            if (this.resetNeeded) {
                consumer.seekToBeginning(consumer.assignment());
                this.resetNeeded = false;
            }
        }
    

    如果您想在侦听器空闲(无记录)时重置,您可以启用空闲事件并通过在 ApplicationListener@EventListener 方法中侦听 ListenerContainerIdleEvent 来执行搜索。

    事件有对消费者的引用。

    编辑

    @SpringBootApplication
    public class So58769796Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So58769796Application.class, args);
        }
    
        @KafkaListener(id = "so58769796", topics = "so58769796")
        public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
            System.out.println("One:" + key + ":" + value);
        }
    
        @KafkaListener(id = "so58769796a", topics = "so58769796")
        public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
            System.out.println("Two:" + key + ":" + value);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so58769796")
                    .compact()
                    .partitions(1)
                    .replicas(1)
                    .build();
        }
    
        boolean reset;
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so58769796", "foo", "bar");
                System.out.println("Hit enter to rewind");
                System.in.read();
                this.reset = true;
            };
        }
    
        @EventListener
        public void listen(ListenerContainerIdleEvent event) {
            System.out.println(event);
            if (this.reset && event.getListenerId().startsWith("so58769796-")) {
                event.getConsumer().seekToBeginning(event.getConsumer().assignment());
            }
        }
    
    }
    

    spring.kafka.listener.idle-event-interval=5000
    

    EDIT2

    这是另一种技术 - 在这种情况下,我们会在每次应用启动时(并按需)回退...

    @SpringBootApplication
    public class So58769796Application implements ConsumerSeekAware {
    
        public static void main(String[] args) {
            SpringApplication.run(So58769796Application.class, args);
        }
    
        @KafkaListener(id = "so58769796", topics = "so58769796")
        public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
            System.out.println(key + ":" + value);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so58769796")
                    .compact()
                    .partitions(1)
                    .replicas(1)
                    .build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template,
                KafkaListenerEndpointRegistry registry) {
    
            return args -> {
                template.send("so58769796", "foo", "bar");
                System.out.println("Hit enter to rewind");
                System.in.read();
                registry.getListenerContainer("so58769796").stop();
                registry.getListenerContainer("so58769796").start();
            };
    
        }
    
        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }
    
    }
    

    【讨论】:

    • 感谢您的及时回复。这个想法更多的是按需重置(当我们明确需要时)。有可能吗?
    • 查看对我的答案的编辑以获取示例。第二个编辑展示了另一种技术,通过实现ConsumerSeekAware
    • 这正是我所需要的。谢谢!
    • 它正在工作,但正在发生的事情是从应用程序中的不同主题重置我的所有消费者,而不仅仅是我使用这个标志的那个。这种行为可能是因为 groupid 吗?
    • 如果您在同一个ConsumerSeekAware 类中有多个@KafkaListeners,它将适用于它们;你应该把你想倒带的那个放在它自己的类中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-07-09
    • 2019-09-20
    • 2018-07-21
    • 2022-10-23
    • 2020-12-12
    • 1970-01-01
    • 2021-12-12
    相关资源
    最近更新 更多