【问题标题】:Spring Cloud @StreamListener condition deprecated what is the alternativeSpring Cloud @StreamListener 条件已弃用什么是替代方案
【发布时间】:2021-11-11 21:46:16
【问题描述】:

我们有多个应用程序消费者监听同一个 kafka 主题,并且生产者在向主题发送消息时设置消息头,以便特定实例可以评估头并处理消息。例如

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}

在 Spring Cloud Stream 3.0.0 中,@StreamListener 已弃用,我在 Function 中找不到 condition 属性的等效项。

有什么建议吗?

【问题讨论】:

    标签: java spring-boot apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    虽然我也找不到功能方法的等效方法,但我确实有一个建议。

    @StreamListener 注释条件并不能阻止应用程序在将消息传递给侦听器 (fullfillOrder()) 之前必须使用消息、读取其标头并过滤掉特定记录这一事实。因此可以安全地假设您正在消费每条命中主题的消息(通过 Spring Cloud 在后台为我们实现的事件接收器),但只有在 header == sydney 时才会执行侦听器。

    如果有一种方法可以配置 Spring Cloud 使用的事件接收器(在点击侦听器之前丢弃消息),我建议您研究一下。如果没有,将在进行任何处理之前过滤掉任何消息(非悉尼)。如果您熟悉 Spring Cloud 的函数式方法,可能看起来像这样:

    @Bean
    public Consumer<Message<TestObj>> fulfillOrder() {
        return msg -> {
            // to get header - msg.getHeaders().get(key, valueType);
            // filter out bad messages
        }
    }
    

    @Bean
    public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
        return msg -> {
            // msg.headers().lastHeader("franchiseName").value() -> filter em out
        }
    }
    

    其他: ^ 我的代码假设您通过 spring-cloud-stream-binder-kafka 将 kafka-client API 与 Spring 云流集成。根据列出的标签,我会注意到 Spring Cloud Stream 有两个版本的 Kafka 绑定器 - 一个用于 kafka 客户端库,一个用于 kafka 流库。

    在不考虑 Spring Cloud / Frameworks 的情况下,kafka 流中的高等级 DSL 无法让您访问标头,但低级处理器 API 可以。从示例中,您似乎正在利用客户端活页夹而不是 spring-cloud-stream-binder-kafka-streams / kafka 流活页夹。我还没有看到使用低级处理器 API 实现 spring cloud stream + kafka 流 binder 的实现,所以我不知道这是否是目标。

    【讨论】:

    • 另一种选择:在消费者和代理之间注入一个拦截器。这很有用,因为如果您的逻辑在任何不符合您的标头要求的记录上返回 null,.intercept() 方法不会将记录传递给消费者。 docs.spring.io/spring-kafka/docs/current/api/org/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-11
    • 2021-08-16
    • 2016-08-16
    相关资源
    最近更新 更多