【问题标题】:How can i determine that a Keystore is updated in Spring Cloud Stream API我如何确定在 Spring Cloud Stream API 中更新了 Keystore
【发布时间】:2019-04-26 02:50:30
【问题描述】:

我使用 spring cloud stream api 的聚合函数从一个主题创建一个物化视图。 如下所示:

public void process(KStream<Object, Object> input){
input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

然后我查询我创建的 Statestore:

 ReadOnlyWindowStore<Object, Object> windowStore =
  queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());

现在我的问题是,在 process 方法处理了新事件后,如何确定此 statestore 是否已更新?他们的某种事件是我可以听还是我可以创建一个?

【问题讨论】:

    标签: spring apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    你的程序是:

    input
      .peek((key, value) ->{...}
      .map((key, value) -> {...}
      .groupByKey()
      .windowedBy(TimeWindows.of(5000))
      .aggregate(Initializer, Aggregator, Materialized)
    

    实际上,最后一个aggregate() 返回一个KTable 对象。如果您通过Materialized 禁用缓存,您可以通过以下方式了解KTable 的每一次更新:

    input
      .peek((key, value) ->{...}
      .map((key, value) -> {...}
      .groupByKey()
      .windowedBy(TimeWindows.of(5000))
      .aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
      .toStream()
      .foreach(...) // react to every update to the KTable
    

    【讨论】:

    • 可以同时做这两个吗?或者我应该创建 2 个不同的聚合,一个用于缓存,一个用于监听然后触发某些东西?
    • “缓存”在这里是一个重载的术语...如果您按照建议更新代码,您仍然可以获得您的物化视图并且您仍然可以查询它。 “缓存”意味着不同的东西,并且与下游推送更新到foreach() 相关(参见docs.confluent.io/current/streams/developer-guide/…
    猜你喜欢
    • 2020-09-27
    • 1970-01-01
    • 2018-06-20
    • 2020-06-02
    • 2018-12-17
    • 1970-01-01
    • 1970-01-01
    • 2018-07-18
    • 1970-01-01
    相关资源
    最近更新 更多