【问题标题】:How to create multi output stream from single input stream with Spring Cloud Kafka stream binder?如何使用 Spring Cloud Kafka 流绑定器从单个输入流创建多输出流?
【发布时间】:2020-10-29 14:40:54
【问题描述】:

我正在尝试从单个输入流创建多个输出流(取决于不同的时间窗口)。

interface AnalyticsBinding {
        String PAGE_VIEWS_IN = "pvin";
        String PAGE_VIEWS _COUNTS_OUT_Last_5_Minutes = "pvcout_last_5_minutes";
        String PAGE_VIEWS _COUNTS_OUT_Last_30_Minutes = "pvcout_last_30_minutes";
        @Input(PAGE_VIEWS_IN)
        KStream<String, PageViewEvent> pageViewsIn();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes)
        KStream<String,Long> pageViewsCountOutLast5Minutes();
        @Output(PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes)
        KStream<String,Long> pageViewsCountOutLast30Minutes();
    }

  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes })
    public KStream<String, Long> processPageViewEventForLast5Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(5)
    }


  @StreamListener
  @SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes })
    public KStream<String, Long> processPageViewEventForLast30Mintues(
            @Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
                  // aggregate by Duration.ofMinutes(30)
}

当我启动应用程序时,只有一个流任务可以工作,有没有办法让 processPageViewEventForLast5MintuesprocessPageViewEventForLast30Mintues 同时工作

【问题讨论】:

    标签: spring-boot apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    您在两个处理器中使用相同的输入绑定,这就是为什么您只看到一个在工作。在绑定接口中添加另一个输入绑定,并将其目标设置为同一主题。此外,更改StreamListener 方法之一以使用此新绑定名称。

    话虽如此,如果您使用的是最新版本的 Spring Cloud Stream,您应该考虑迁移到功能模型。例如以下应该可以工作。

    @Bean
    public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast5Mintues() {
    ...
    }
    

    @Bean
    public Function<KStream<String, PageViewEvent>, KStream<String, Long>> processPageViewEventForLast30Mintues() {
    ...
    }
    

    在这种情况下,活页夹会自动创建两个不同的输入绑定。 您可以在这些绑定上设置目的地。

    spring.cloud.stream.bindings.processPageViewEventForLast5Mintues-in-0.destination=<your Kafka topic>
    spring.cloud.stream.bindings.processPageViewEventForLast30Mintues-in-0.destination=<your Kafka topic>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多