【问题标题】:Using manual commit with multiple message consumer对多个消息使用者使用手动提交
【发布时间】:2020-11-10 14:28:03
【问题描述】:

我对 Kafka 很陌生。 使用 spring-boot kafka,我使用一个 Message 对象和手动 ack 开发了一个发布者和一个消费者。我的代码使用弹簧注释。这是完美的作品。 现在,当我连接到生产代理时,这个代理发送的不是一条消息,而是一个消息列表。 我的侦听器方法具有以下签名:

@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload Message message, Acknowledgment ack)

所以我可以确认每条消息。好的。 但现在看来我必须将其替换为

@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload List<Message> messages, Acknowledgment ack)

即使遵循文档,我似乎也应该使用

@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload List<Message> messages, Acknowledgment ack, Consumer<?,?> consumer)
  1. 我应该将批处理模式设置为 true 吗?
  2. 现在的问题是:当这条消息被完全处理后,我如何确认每条消息?

非常感谢您的帮助

【问题讨论】:

    标签: spring-boot apache-kafka consumer


    【解决方案1】:

    如果您确实想手动提交偏移量,类似这样的东西可以帮助您。

    如果您不想要它,请将setAckMode 切换为其他值。

    这是按弹簧方式完成的。

    CoreAutoConfiguration类:

    @Configuration
    @Import({KafkaAutoConfiguration.class})
    public class CoreAutoConfiguration {
    
    @Bean("batchKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> batchKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setBatchListener(true);
        return factory;
        }
    }
    

    然后是你的Config 课程:

    @Configuration
    @Import({
            CoreAutoConfiguration.class,
            KafkaAutoConfiguration.class,
    })
    @EnableKafka
    @EnableRetry
    public class Config {
    }
    

    最后是消费者:

    @KafkaListener(
            topics = "MessagesTopic",
            containerFactory = "batchKafkaListenerContainerFactory"
    )
    public void dataReceived(@Payload List<String> payload) throws RuntimeException {
        yourService.processIncomingData(payload);
    }
    

    最后是属性:

    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=helloworld
    spring.kafka.listener.type=batch
    spring.kafka.consumer.enable-auto-commit=false
    # this is size of incoming list if broker has this many entries, can be lower eventually
    spring.kafka.consumer.max-poll-records=100
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    【讨论】:

    • 非常感谢您的回复。但是,我看不到消息在哪里提交......
    • 在这种情况下没有抛出异常 = ack。如果您需要batch.ack() 之类的东西,请修改工厂。但我怀疑这个修改是否值得 tbh - 如果没有发生异常,为什么不让 spring 为你提交批处理
    猜你喜欢
    • 2017-10-07
    • 2016-09-04
    • 2021-03-21
    • 1970-01-01
    • 1970-01-01
    • 2016-10-13
    • 2018-11-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多