【问题标题】:KafkaStreams: Getting Window Final ResultsKafkaStreams:获取窗口最终结果
【发布时间】:2019-06-04 05:34:18
【问题描述】:

是否可以通过抑制中间结果在Kafka Streams中获得window final result

我无法实现这个目标。我的代码有什么问题?

    val builder = StreamsBuilder()
    builder.stream<String,Double>(inputTopic)
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
            .toStream()
            .print(Printed.toSysOut())

导致这个错误:

Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

代码/错误详情:https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380

【问题讨论】:

    标签: kotlin apache-kafka apache-kafka-streams


    【解决方案1】:

    添加Consumed,创建流时:builder.stream&lt;String,Double&gt;(inputTopic, Consumed.with(Serdes.String(), Serdes.Double())

    【讨论】:

    • 这无济于事,因为问题出在.suppress 方法上。为此,需要在 State Store 上定义 Key Serde
    • @NishuTayal 就我而言,这就足够了。你也可以在这里阅读:stackoverflow.com/questions/54036328/…
    【解决方案2】:

    问题在于 KeySerde。由于WindowedBy 操作导致Windowed&lt;String&gt; 键入键,但.suppress() 使用默认键类型。

    因此,您需要在调用 count 方法时在 State 存储上定义 KeySerde,如下所示:

          builder.stream<String,Double>inputTopic)
          .groupByKey()
          .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
          .count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
          .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
          .toStream()
          . print(Printed.toSysOut());
    

    【讨论】:

      【解决方案3】:

      问题在于 Streams 在窗口期间自动包装显式 serde,但不会自动包装默认 serde 的方式存在令人困惑的不对称性。恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806

      正如其他人所指出的,解决方案是在上游显式设置密钥 serde,而不依赖于默认密钥 serde。您可以:

      使用Materialized在窗口聚合上设置serdes

      val builder = StreamsBuilder()
      builder.stream<String,Double>(inputTopic)
              .groupByKey()
              .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
              .count(Materialized.with(Serdes.String(), Serdes.Long()))
              .suppress(Suppressed.untilWindowCloses(unbounded())))
              .toStream()
              .print(Printed.toSysOut())
      

      (如尼舒推荐的)

      (请注意,不需要命名count操作,这样做的副作用是使其可查询)

      或者在上游设置 serdes,例如在输入端:

      val builder = StreamsBuilder()
      builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
              .groupByKey()
              .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
              .count()
              .suppress(Suppressed.untilWindowCloses(unbounded())))
              .toStream()
              .print(Printed.toSysOut())
      

      (正如 wardziniak 推荐的)

      选择权在你;我认为在这种情况下,这两种情况都没有太大的不同。如果您进行与count 不同的聚合,那么无论如何您可能会通过Materialized 设置值serde,所以也许前者会是一种更统一的样式。

      我还注意到您的窗口定义没有设置宽限期。窗口关闭时间定义为window end + grace period,默认为 24​​ 小时,因此在 24 小时的数据通过应用程序运行之前,您不会看到抑制发出的任何内容。

      对于您的测试工作,我建议您尝试:

      .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
      

      在生产中,您需要选择一个宽限期,以平衡您期望在流中的事件延迟量与您希望从抑制中看到的发射及时量。

      最后一点,我在您的要点中注意到您没有更改默认缓存或提交间隔。因此,您会注意到 count 运算符本身将在默认的 30 秒内缓冲更新,然后再将它们传递给抑制。这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈。但它可能会在您测试时让您感到惊讶。

      通常对于测试(或以交互方式尝试的东西),我将禁用缓存并将提交间隔设置为短以最大限度地提高开发人员的理智:

      properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
      properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
      

      对于 serde 的疏忽,我们深表歉意。我希望我们能尽快解决 KAFKA-7806。

      我希望这会有所帮助!

      【讨论】:

      • 惊人的解释!感谢您提供有关提交间隔的信息,这让我很开心!
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-20
      相关资源
      最近更新 更多