【发布时间】:2020-10-29 14:40:54
【问题描述】:
我正在尝试从单个输入流创建多个输出流(取决于不同的时间窗口)。
interface AnalyticsBinding {
String PAGE_VIEWS_IN = "pvin";
String PAGE_VIEWS _COUNTS_OUT_Last_5_Minutes = "pvcout_last_5_minutes";
String PAGE_VIEWS _COUNTS_OUT_Last_30_Minutes = "pvcout_last_30_minutes";
@Input(PAGE_VIEWS_IN)
KStream<String, PageViewEvent> pageViewsIn();
@Output(PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes)
KStream<String,Long> pageViewsCountOutLast5Minutes();
@Output(PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes)
KStream<String,Long> pageViewsCountOutLast30Minutes();
}
@StreamListener
@SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_5_Minutes })
public KStream<String, Long> processPageViewEventForLast5Mintues(
@Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
// aggregate by Duration.ofMinutes(5)
}
@StreamListener
@SendTo({ AnalyticsBinding.PAGE_VIEWS_COUNTS_OUT_Last_30_Minutes })
public KStream<String, Long> processPageViewEventForLast30Mintues(
@Input(AnalyticsBinding.PAGE_VIEWS_IN)KStream<String, PageViewEvent> stream) {
// aggregate by Duration.ofMinutes(30)
}
当我启动应用程序时,只有一个流任务可以工作,有没有办法让 processPageViewEventForLast5Mintues 和 processPageViewEventForLast30Mintues 同时工作
【问题讨论】:
标签: spring-boot apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream