【发布时间】:2020-09-30 20:18:26
【问题描述】:
Properties streamsConfiguration = this.buildKafkaProperties();
LOGGER.info("kafka properties for streaming is ::{}", streamsConfiguration);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, LocationChangeEvent> kStream = builder.stream(this.kafkaConfigProperties.getTopicName(), Consumed.with(Serdes.String(), locationChangeEventSerde));
KGroupedStream<String, LocationChangeEvent> grouped = kStream.groupBy((key, value) -> key);
grouped.windowedBy(TimeWindows.of(Long.parseLong(String.valueOf(Duration.ofMinutes(2)))));
说明:我想从 kafka 流中删除重复的密钥。
我有KafkaStreams<String,LocationChangeEvent> kstreams...
示例 - 假设我在 kafkaStreams 中获得了这些事件
{id="1",event1},
{id="2",event2},
{id="3",event3},
{id="1",event3},
{id="2",event3}
现在,我想对它们进行分组,以便在给定的时间范围内不存在重复的键(id)。
输出kafkaStream:
{id="1",event1},
{id="2",event2},
{id="3",event3}
从kafkaStream 中删除重复键。
尝试使用Kstreams.groupByKey(),但它不适用于我的情况。
我不想计算唯一键。我希望我的 Kstream 只包含唯一键和相应的事件。
【问题讨论】:
-
您能否详细说明一下使用 lil 位代码以便我可以继续?我不知道如何继续。
标签: apache-kafka group-by apache-kafka-streams spring-kafka