【问题标题】:How to only window the input in Kafka Streams using Java lamda?如何仅使用 Java lambda 窗口化 Kafka Streams 中的输入?
【发布时间】:2019-03-24 15:16:27
【问题描述】:

我有使用 Kafka Stream 获取的输入数据。我需要实现的只是一个 5 秒的翻转窗口,并将数据输出到 Kafka 主题。但是,我无法使用 lambda 完成此操作。有人可以帮忙吗?

以下是我写的,但我收到错误:

    KTable<TimeWindowedKStream<String, String> , String> result = source.
            groupByKey().windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(5000)));

    result.to(Serdes.String(), Serdes.Long(), "outputtopic");

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

但是,对于结果变量,eclipse 给了我一个错误:“类型不匹配:无法从 TimeWindowedKStream 转换为 KTable,String>”。

在将结果的值写入另一个主题时,eclipse 给了我错误:“KTable,String> 类型中的方法 to(Serde>, Serde, String) 不适用于参数 (Serde, Serde, String) )”。

据我所知,如果没有某种聚合,就无法实现窗口化。但是,我只想将每 5 秒窗口的数据输出到另一个输出主题。

【问题讨论】:

  • "但是,我只想将每 5 秒窗口的数据输出到另一个输出主题。" ——我不清楚你想达到什么目标?如果您对输入流进行窗口化,并将所有数据再次写入输出主题,那么您为什么要首先对其进行窗口化呢?或者您想为每个窗口写一条输出消息?对于这种情况,您需要将.aggregate() 一个窗口的所有输入记录合并到一个输出消息中。在这种情况下,您可能还想使用suppress() 运算符,否则,结果可能不是您所期望的。

标签: java lambda apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

“类型不匹配:无法从 TimeWindowedKStream 转换为 KTable,String>”。

您必须在TimeWindowedKStream 上调用一些聚合函数来获取表格,例如。 count(), aggregate(...)

“KTable,String>类型的to(Serde>, Serde, String)方法不适用于参数(Serde, Serde, String)”

您无法使用第一次调用 KTable::toStream()KTable 来写主题。 KStream 即返回有to(...) 函数。

【讨论】:

  • 如何将 TimeWindowedKStream 转换为 Kstream?我按以下键和窗口分组,我需要写一个主题:TimeWindowedKStream&lt;String, String&gt; result=words .groupBy((key, word) -&gt; word) .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5000)));
  • 你可以使用类似的东西:source.groupByKey().windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(5000))).count().toStream().map((key, value) -&gt; new KeyValue&lt;&gt;(key.key(), value)).to("outputtopic", Produced.with(Serdes.String(), Serdes.Long()));
猜你喜欢
  • 2018-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多