【发布时间】:2020-04-24 09:57:46
【问题描述】:
我想加入两个主题流(左连接)并在加入的流上进行基于窗口的聚合。然而,聚合计算一些消息两次,因为在加入期间,一些消息根据正确主题的延迟发出两次。 以下是 POC 的代码。
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, BidMessage> bidStream = builder.stream("bid", Consumed.with(new LongSerde(), new BidMessageSerde()).withTimestampExtractor(new BidMessageTimestampExtractor()));
KStream<Long, ClickMessage> clickStream = builder.stream("click", Consumed.with(new LongSerde(), new ClickMessageSerde()).withTimestampExtractor(new ClickMessageTimestampExtractor()));
KStream<String, BidMessage> newBidStream = bidStream.selectKey((key, value) -> value.getRequestId());
KStream<String, CLickMessage> newClickStream = impStream.selectKey((key, value) -> value.getRequestId());
KStream<String, BidMergedMessage> result = newBidStream.leftJoin(newImpStream,
getValueJoiner(),
JoinWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(0)),
Joined.with(Serdes.String(), new BidMessageSerde(), new ClickMessageSerde()));
result.groupBy((key, value) -> "" + value.getClientId(), Grouped.with(Serdes.String(), newBidMergedSerde()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(40)))
.aggregate(() -> new AggResult(0, 0), (key, value, aggregate) -> {
if (value.getClickId() != null) {
aggregate.clicks_++;
}
aggregate.bids_++;
return aggregate;
}, Materialized.with(Serdes.String(),new AggResultJsonSerde()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((key, value) -> {
logger.info("{}-{}, clientId : {}, Value: {}", new Date(key.window().start()), new Date(key.window().end()),key.key(), value);
});
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
是否可以修复以避免由于加入而导致的重复?
【问题讨论】:
标签: apache-kafka apache-kafka-streams