【发布时间】:2021-09-08 23:51:30
【问题描述】:
有没有办法在以下 kafka 消费者代码中使用 @Header ?我正在使用 Spring Cloud Stream (Kafka Stream binder implementation),并且在我的实现之后使用 functional model 例如。
@Bean
public Consumer<KStream<String, Pojo>> process() {
return messages -> messages.foreach((k, v) -> process(v));
}
如果使用 Spring for apache kafka 那么这可以很简单
@KafkaListener(topics = "${mytopicname}", clientIdPrefix = "${myprefix}", errorHandler = "customEventErrorHandler")
public void processEvent(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
@Valid Pojo pojo) {
...
// use headers here
...
}
【问题讨论】:
标签: spring-kafka spring-cloud-stream