【问题标题】:@Header and spring stream functional programming model@Header 和 Spring Stream 函数式编程模型
【发布时间】:2021-09-08 23:51:30
【问题描述】:

有没有办法在以下 kafka 消费者代码中使用 @Header ?我正在使用 Spring Cloud Stream (Kafka Stream binder implementation),并且在我的实现之后使用 functional model 例如。

@Bean
public Consumer<KStream<String, Pojo>> process() {
    return messages -> messages.foreach((k, v) -> process(v));
}

如果使用 Spring for apache kafka 那么这可以很简单

@KafkaListener(topics = "${mytopicname}", clientIdPrefix = "${myprefix}", errorHandler = "customEventErrorHandler")
public void processEvent(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
                         @Valid Pojo pojo) {
    ...
    // use headers here
    ...
}

【问题讨论】:

    标签: spring-kafka spring-cloud-stream


    【解决方案1】:

    没有; Kafka Streams binder 不基于 Spring Messaging。

    您可以在添加到您的信息流中的Transformer(通过ProcessorContext)中访问标题、主题等。

    您可以将 Kafka 消息通道绑定器与

    @Bean
    public Consumer<Message<Pojo>> process() {
        return message -> ...
    }
    

    【讨论】:

    • 谢谢 Gary,ProcessorContext 有我需要的所有信息(标题、topicName 等)
    猜你喜欢
    • 2019-10-24
    • 1970-01-01
    • 2019-04-25
    • 1970-01-01
    • 2010-09-06
    • 2020-12-10
    • 1970-01-01
    • 2020-10-23
    • 1970-01-01
    相关资源
    最近更新 更多