【问题标题】:Auto commit in kafka with spring cloud stream使用spring cloud stream在kafka中自动提交
【发布时间】:2020-08-31 12:43:48
【问题描述】:

我有一个应用程序,我想手动在 Kafka 消息中执行 (n)ack。根据spring cloud的文档,应该用autoCommitOffsetspring cloud documentation完成

但是,在我的应用程序中,即使定义了此类属性,标头 KafkaHeaders.ACKNOWLEDGMENT 仍然为 null。

这是我的配置的样子

spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false

还有我的消费者:

@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
    MyTopic payload = message.getPayload();
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}

我使用 java 13 和 spring boot 2.2.5.RELEASE 和 spring cloud Hoxton.SR1

感谢任何帮助。

【问题讨论】:

    标签: java spring-boot apache-kafka spring-cloud spring-cloud-stream


    【解决方案1】:

    我刚刚复制了你的属性,它对我来说很好......

    GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@55d4844d, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic= MyInputTopic, kafka_receivedTimestamp=1589488691039, kafka_acknowledgment=ConsumerRecord 的确认(topic = MyInputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1589488691039, 序列化密钥大小 = -1, 序列化值大小 = 3、headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@572887c3), contentType=application/json, kafka_groupId=myConsumerGroup}]

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(Message<String> in) {
            System.out.println(in);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> {
                template.send("MyInputTopic", "foo".getBytes());
            };
        }
    
    }
    
    spring.cloud.stream.default.contentType=application/json
    spring.cloud.stream.bindings.input.destination=MyInputTopic
    spring.cloud.stream.bindings.input.group=myConsumerGroup
    spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
    

    【讨论】:

    • 用Java8、Java14测试过;启动 2.2.7/Hoxton.SR4;启动 2.2.5/HoxtonSR1,一切正常。
    【解决方案2】:

    我发现为什么我的消费者没有按预期工作:

    在我的配置中,我有类似spring.cloud.stream.bindings. mytopic.destination=MyInputTopic 的东西,但是流绑定是这样完成的:

    @StreamListener("Mytopic")

    显然,以spring.cloud.stream.bindings 为前缀的配置不区分大小写(因为所有配置都按预期工作),但以spring.cloud.stream.kafka.bindings 为前缀的配置区分大小写导致我的问题。

    【讨论】:

      猜你喜欢
      • 2018-04-28
      • 1970-01-01
      • 1970-01-01
      • 2021-07-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-22
      相关资源
      最近更新 更多