【问题标题】:Kafka Streams | Aggregation of Joined stream卡夫卡流 |加入流的聚合
【发布时间】:2020-04-24 09:57:46
【问题描述】:

我想加入两个主题流(左连接)并在加入的流上进行基于窗口的聚合。然而,聚合计算一些消息两次,因为在加入期间,一些消息根据正确主题的延迟发出两次。 以下是 POC 的代码。

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, BidMessage> bidStream = builder.stream("bid", Consumed.with(new LongSerde(), new BidMessageSerde()).withTimestampExtractor(new BidMessageTimestampExtractor()));
        KStream<Long, ClickMessage> clickStream = builder.stream("click", Consumed.with(new LongSerde(), new ClickMessageSerde()).withTimestampExtractor(new ClickMessageTimestampExtractor()));

        KStream<String, BidMessage> newBidStream = bidStream.selectKey((key, value) -> value.getRequestId());
        KStream<String, CLickMessage> newClickStream = impStream.selectKey((key, value) -> value.getRequestId());

        KStream<String, BidMergedMessage> result = newBidStream.leftJoin(newImpStream,
                getValueJoiner(),
                JoinWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(0)),
                Joined.with(Serdes.String(), new BidMessageSerde(), new ClickMessageSerde()));

        result.groupBy((key, value) -> "" + value.getClientId(), Grouped.with(Serdes.String(), newBidMergedSerde()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(40)))
                .aggregate(() -> new AggResult(0, 0), (key, value, aggregate) -> {
                    if (value.getClickId() != null) {
                        aggregate.clicks_++;
                    }
                    aggregate.bids_++;
                    return aggregate;
                }, Materialized.with(Serdes.String(),new AggResultJsonSerde()))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .foreach((key, value) -> {
                    logger.info("{}-{}, clientId : {}, Value: {}", new Date(key.window().start()), new Date(key.window().end()),key.key(), value);
                });

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

是否可以修复以避免由于加入而导致的重复?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    我不确定我是否完全遵循了您的要求,但问题似乎是您希望在没有合并的情况下 RHS 或加入是否成功。但由于 RHS 主题的速度有些慢,您偶尔会得到两个结果,先是未合并,然后在 RHS 记录到达时再合并。

    您可以在 result KStream 上添加 TransformValues 运算符并使用 statestore 来跟踪传入的记录。当您有一个成功加入的副本时,您可以查看 statestore并删除RHS为空的记录(如果存在),然后转发正确的连接结果。

    要转发从未导致成功加入的记录,您可以考虑使用punctuate() 定期检查存储并发出不匹配且在状态存储中超过您感觉的时间的记录应该已经加入。

    来自 Kafka 教程的 tutorial 也可以作为指南。

    【讨论】:

      【解决方案2】:

      我能想到两个选择。

      1. 首先要尝试将filter 添加到KStream&lt;String, BidMergedMessage&gt; result 流。我假设您可以从ValueJoiner 返回的对象中判断出 RHS 值为空。

      2. 使用内连接newBidStream.join(newImpStream...

      -比尔

      【讨论】:

      • 感谢您的回复。我需要两种出价消息,与点击合并的消息和从未合并的消息。
      猜你喜欢
      • 2022-07-12
      • 2018-03-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-13
      • 2019-09-19
      • 2017-02-08
      • 2016-08-03
      相关资源
      最近更新 更多