【发布时间】:2019-04-26 02:50:30
【问题描述】:
我使用 spring cloud stream api 的聚合函数从一个主题创建一个物化视图。 如下所示:
public void process(KStream<Object, Object> input){
input
.peek((key, value) ->{...}
.map((key, value) -> {...}
.groupByKey()
.windowedBy(TimeWindows.of(5000))
.aggregate(Initializer, Aggregator, Materialized)
然后我查询我创建的 Statestore:
ReadOnlyWindowStore<Object, Object> windowStore =
queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());
现在我的问题是,在 process 方法处理了新事件后,如何确定此 statestore 是否已更新?他们的某种事件是我可以听还是我可以创建一个?
【问题讨论】:
标签: spring apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream