【问题标题】:How to process only unique keys from kafkaStreams?如何仅处理来自 kafkaStreams 的唯一键?
【发布时间】:2020-09-30 20:18:26
【问题描述】:
Properties streamsConfiguration = this.buildKafkaProperties();
        LOGGER.info("kafka properties for streaming is ::{}", streamsConfiguration);
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, LocationChangeEvent> kStream = builder.stream(this.kafkaConfigProperties.getTopicName(), Consumed.with(Serdes.String(), locationChangeEventSerde));
KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
      grouped.windowedBy(TimeWindows.of(Long.parseLong(String.valueOf(Duration.ofMinutes(2)))));

说明:我想从 kafka 流中删除重复的密钥。 我有KafkaStreams&lt;String,LocationChangeEvent&gt; kstreams...

示例 - 假设我在 kafkaStreams 中获得了这些事件

{id="1",event1},
{id="2",event2},
{id="3",event3},
{id="1",event3},
{id="2",event3}

现在,我想对它们进行分组,以便在给定的时间范围内不存在重复的键(id)。 输出kafkaStream:

{id="1",event1},
{id="2",event2},
{id="3",event3}

kafkaStream 中删除重复键。 尝试使用Kstreams.groupByKey(),但它不适用于我的情况。 我不想计算唯一键。我希望我的 Kstream 只包含唯一键和相应的事件。

【问题讨论】:

  • 您能否详细说明一下使用 lil 位代码以便我可以继续?我不知道如何继续。

标签: apache-kafka group-by apache-kafka-streams spring-kafka


【解决方案1】:

您可以为此使用aggragetesuppress。示例代码如下:

KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
grouped.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
        .aggregate(null, (key, value, agg) -> Optional.ofNullable(agg).orElse(value))
        .suppress(Suppressed.untilWindowCloses(unbounded()))
        .toStream()
        .map((windowedKey, value) -> new KeyValue<>(windowedKey.key(), value));

关于抑制的更多细节你可以找到here

【讨论】:

  • 如果我需要与特定时间范围内的键对应的最新事件,即上面的“id1”示例,我需要“event3”。我该如何实施?请帮忙
  • 我想在 10 分钟窗口内获取与 key 对应的最新时间戳的记录。假设事件是 {id="1",event1}, {id="2",event2}, {id="3",event8}, {id="1",event3}, {id="2", event4} ,那么它应该给我 => {id="1",event3}, {id="2",event4}, {id="3",event8},
  • 如果你可以依赖排序,你可以做reduce((agg,value) -&gt; value)——如果你想考虑记录时间戳,你可以在groupByKey之前通过transformValues()访问时间戳(你应该使用groupByKey 而不是无操作groupBy 以避免昂贵且不必要的重新分区)并执行reduce((agg,value) -&gt; value.timestamp &gt;= agg.timestamp ? value : agg)。聚合后,您可以使用mapValues 步骤再次删除时间戳。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-03-05
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多