【发布时间】:2020-08-31 12:43:48
【问题描述】:
我有一个应用程序,我想手动在 Kafka 消息中执行 (n)ack。根据spring cloud的文档,应该用autoCommitOffsetspring cloud documentation完成
但是,在我的应用程序中,即使定义了此类属性,标头 KafkaHeaders.ACKNOWLEDGMENT 仍然为 null。
这是我的配置的样子
spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false
还有我的消费者:
@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
MyTopic payload = message.getPayload();
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}
我使用 java 13 和 spring boot 2.2.5.RELEASE 和 spring cloud Hoxton.SR1
感谢任何帮助。
【问题讨论】:
标签: java spring-boot apache-kafka spring-cloud spring-cloud-stream