【问题标题】:Spring cloud stream: Attaching function to binder by typeSpring cloud stream:按类型将函数附加到活页夹
【发布时间】:2021-12-02 23:45:06
【问题描述】:

我有一个以功能方法实现的 Spring 云流应用程序。该应用程序使用来自多个 Kafka 主题的事件,将输入规范化为输出模式(总是相同的模式)并发布到 Kafka。我没有使用 Kafka-streams,因为不需要加入/丰富/状态。

我想通过控制在运行时使用的输入主题来实现灵活部署:您可以从所有主题或单个主题中使用。我的做法是为每种类型声明专用函数,并为每个函数声明一个专用绑定。 问题是活页夹(只有一个)将所有传入消息路由到所有绑定,当调用错误的函数来处理某些事件类型时,我得到 ClassCastException。

我想到了以下解决方案,但我想知道是否有更好的方法:

  1. 每个绑定只有一个活页夹。我宁愿不要,尤其是 因为我使用的是配置良好的活页夹,我不想简单地 复制它。
  2. 具有单个活页夹和单个函数类型 Message>,内部检查对象类型,将其强制转换 并按类型处理。

我的 application.yaml 看起来像这样:

spring:
cloud:
    function:
        definition: data;more
    stream:
        default-binder: kafka-string-avro
        bindings:
            data-in-0:
                binder: kafka-string-avro
                destination: data.emails.events
                group: communication_system_events_data_gp
            data-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
            more-in-0:
                binder: kafka-string-avro
                destination: communication.emails.send.status
                group: communication_system_events_more_gp
            more-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true

我的功能:

@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
    return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
    return new MoreFunction();
}

【问题讨论】:

    标签: apache-kafka spring-kafka spring-cloud-stream spring-cloud-function


    【解决方案1】:

    不确定问题出在哪里,但我发现您提供的内容存在一些配置问题。复制到问题时可能是拼写错误,但以下配置应将两个不同的主题隔离到其相应的功能。

    spring:
    cloud:
        function:
            definition: dataFunction;moreFunction
        stream:
            default-binder: kafka-string-avro
            bindings:
                dataFunction-in-0:
                    binder: kafka-string-avro
                    destination: data.emails.events
                    group: communication_system_events_data_gp
                dataFunction-out-0:
                    binder: kafka-string-avro
                    destination: communication.system.emails.events
                    producer:
                        useNativeEncoding: true
                moreFunction-in-0:
                    binder: kafka-string-avro
                    destination: communication.emails.send.status
                    group: communication_system_events_more_gp
                moreFunction-out-0:
                    binder: kafka-string-avro
                    destination: communication.system.emails.events
                    producer:
                        useNativeEncoding: true
    
    @Bean("data")
    public Function<Message<Data>, Message<Output>> dataFunction() {
        return new DataFunction();
    }
    @Bean("more")
    public Function<Message<More>, Message<Output>> moreFunction() {
        return new MoreFunction();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-10-05
      • 2019-04-27
      • 1970-01-01
      • 1970-01-01
      • 2022-07-12
      • 1970-01-01
      • 2020-02-10
      • 2019-10-24
      相关资源
      最近更新 更多