【问题标题】:@KafkaListener Graceful Shutdown with Batch Kakfa listener not working@KafkaListener 优雅关闭,批处理 Kakfa 侦听器不起作用
【发布时间】:2020-08-31 03:03:36
【问题描述】:

2020-05-14 19:32:11.238 INFO 19880 --- [on(4)-127.0.0.1] oscsupport.DefaultLifecycleProcessor:未能在 30000 超时内关闭 1 个相位值为 2147483547 的 bean:[org .springframework.kafka.config.internalKafkaListenerEndpointRegistry]

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    默认情况下,Spring要求每个SmartLifecycle阶段的bean在30秒内停止;您可以通过添加以下 bean 来更改该行为:

    @Bean
    public DefaultLifecycleProcessor lifecycleProcessor() {
        DefaultLifecycleProcessor lp = new DefaultLifecycleProcessor();
        lp.setTimeoutPerShutdownPhase(120_000);
        return lp;
    }
    

    编辑

    在其侦听器处理批处理时停止容器不会影响批处理的处理;侦听器线程不会被容器杀死;但是,默认情况下,容器会在 10 秒后发布容器停止事件(容器属性 shutDownTimeout),即使侦听器实际上还没有完成。

    如果您担心生命周期处理器会在批处理中杀死线程并且不想增加其超时时间,您可以通过暂停消费者和监听事件的组合来执行正常关闭。

    这是一个例子:

    @SpringBootApplication
    public class So61799727Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So61799727Application.class, args);
        }
    
    
        @KafkaListener(id = "so61799727", topics = "so61799727", concurrency = "3")
        public void listen(List<String> in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so61799727").partitions(3).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template,
                KafkaListenerEndpointRegistry registry) {
    
            return args -> {
                sendTen(template);
                System.out.println("Hit enter to pause container");
                System.in.read();
                registry.getListenerContainer("so61799727").pause();
            };
        }
    
        public static void sendTen(KafkaTemplate<String, String> template) {
            IntStream.range(0, 10).forEach(i -> template.send("so61799727", "foo" + i));
        }
    
    }
    
    @Component
    class Eventer {
    
        privaate final KafkaTemplate<String, String> template;
    
        private final AtomicInteger paused = new AtomicInteger();
    
        Eventer(KafkaTemplate<String, String> template) {
            this.template = template;
        }
    
        @EventListener
        public void paused(ConsumerPausedEvent event) {
            System.out.println(event);
            if (this.paused.incrementAndGet() ==
                    event.getContainer(ConcurrentMessageListenerContainer.class).getConcurrency()) {
                System.out.println("All containers paused");
                So61799727Application.sendTen(this.template);
            }
        }
    
        @EventListener
        public void idle(ListenerContainerIdleEvent event) {
            System.out.println(event);
        }
    
    }
    
    spring.kafka.listener.idle-event-interval=5000
    spring.kafka.listener.type=batch
    spring.kafka.producer.properties.linger.ms=50
    
    

    【讨论】:

    • 有什么方法可以让我知道消费者实例正在运行,即批处理仍在处理消息?或者在批量消息正确确认之前不要停止 ConcurrentMessageListenerContainer ,即所有批量消息都被消耗掉???
    • 当我关闭应用程序时,ListenerContainer 正常停止,即我可以使用 KafkaListenerEndpointRegistry 看到状态已停止。但不处理批量消息。所以我正在寻找一种方法来验证批量消费的所有消息。
    • 停止容器不会影响监听器处理当前批次;容器不会杀死侦听器线程。然而,生命周期处理器是不同的。有关更多详细信息以及可用于正常关闭的技术,请参阅我的答案的编辑。
    • 谢谢您,先生。但是还有其他方法吗,即 EventListener 知道消费者正在停止并且批处理正在处理消息???
    • 这将是你的听众的责任 - 在开始时设置一个标志并在你完成后重置它。容器只有在控制线程正在做什么时才会发出事件。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-08-31
    • 1970-01-01
    • 2014-07-02
    • 2021-09-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多