【问题标题】:Multiple spring cloud stream application running together多个spring cloud stream应用一起运行
【发布时间】:2020-05-18 18:09:55
【问题描述】:

我参考了here 发布的示例。我正在尝试一起运行多个 spring 云流应用程序。这里 first 的输出作为 other 的输入。以下是我正在尝试做的事情。

@Bean
    public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
    {
        //do some processing here and return 
    }
// read output from above process and join it with an event stream
@Bean
    public BiConsumer<KStream<UUID, ProcessEvent>, KTable<UUID, Application>> listen()
    {

        return (eventStream,appTable )-> eventStream
                .join(appTable, (event, app) -> app).foreach((k, app) -> app.createQuote());

    }

application.yml 如下所示

spring.cloud:
 function: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.key.default.type: com.xxx.datamapper.domain.FormUUID
      spring.json.value.default.type: com.xxx.datamapper.domain.FormData
      commit.interval.ms: 1000
  bindings:
    process-in-0.destination: FORM_DATA_TOPIC
    process-out-0.destination: APPLICATION_TOPIC
    listen-in-0.destination: APPLICATION_TOPIC
    listen-in-1.destination: PROCESS_TOPIC

以上配置抛出

java.lang.IllegalStateException: Multiple functions found, but function definition property is not set.

如果我尝试使用以下配置

spring.cloud.stream.function.definition: processAndListen

然后我的应用程序工作但第二个流配置(在监听 Bean 中定义)没有被执行。

【问题讨论】:

    标签: spring-kafka spring-cloud-stream


    【解决方案1】:

    在您的属性中,您需要添加以下内容:

    spring.cloud:
     function.definition: process;listen
    

    这也应该有效 - spring.cloud.stream.function.definition: process;listen

    什么是processAndListen。这个价值从何而来?

    【讨论】:

    • 感谢以上配置成功!!但是,现在我面临另一个 SerializationException。问题是“spring.json.key.default.type”属性已经被占用,我想为剩余类型 UUID、Application 和 ProcessEvent 配置类型。实际上,每个定义的消费者和生产者都需要映射类型。请指出我如何做到这一点的正确方向。也不确定是否应在此处提出后续问题,或者我将创建另一个问题。
    • 嗨,我建议你问另一个问题,因为它无关。如果可能,请提供示例应用程序或为您要完成的工作编写 sn-ps 代码。
    • 我为我的评论创建了一个新问题stackoverflow.com/questions/61897937/…
    • 对我来说第二个属性有效 - “spring.cloud.stream.function.definition: process;listen”
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-10
    • 2019-06-28
    • 1970-01-01
    • 1970-01-01
    • 2021-05-08
    • 2020-02-10
    • 2019-03-16
    相关资源
    最近更新 更多