【发布时间】:2021-12-02 23:45:06
【问题描述】:
我有一个以功能方法实现的 Spring 云流应用程序。该应用程序使用来自多个 Kafka 主题的事件,将输入规范化为输出模式(总是相同的模式)并发布到 Kafka。我没有使用 Kafka-streams,因为不需要加入/丰富/状态。
我想通过控制在运行时使用的输入主题来实现灵活部署:您可以从所有主题或单个主题中使用。我的做法是为每种类型声明专用函数,并为每个函数声明一个专用绑定。 问题是活页夹(只有一个)将所有传入消息路由到所有绑定,当调用错误的函数来处理某些事件类型时,我得到 ClassCastException。
我想到了以下解决方案,但我想知道是否有更好的方法:
- 每个绑定只有一个活页夹。我宁愿不要,尤其是 因为我使用的是配置良好的活页夹,我不想简单地 复制它。
- 具有单个活页夹和单个函数类型 Message>,内部检查对象类型,将其强制转换 并按类型处理。
我的 application.yaml 看起来像这样:
spring:
cloud:
function:
definition: data;more
stream:
default-binder: kafka-string-avro
bindings:
data-in-0:
binder: kafka-string-avro
destination: data.emails.events
group: communication_system_events_data_gp
data-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
more-in-0:
binder: kafka-string-avro
destination: communication.emails.send.status
group: communication_system_events_more_gp
more-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
我的功能:
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
return new MoreFunction();
}
【问题讨论】:
标签: apache-kafka spring-kafka spring-cloud-stream spring-cloud-function