【问题标题】:KStream join Retry / Delayed KStreamKStream 加入 Retry / 延迟 KStream
【发布时间】:2019-09-26 06:55:18
【问题描述】:

我们正在尝试实现下面描述的用例,我们遇到了我们希望克服的实现问题,

用例,

我们正在尝试通过匹配两个流的消息(JSON)中存在的 KEY 来在 2 个 Kafka 主题之间进行 KStream 连接。 我们还应该维护消息序列,因为它是从源到达 KStream 的。

场景是, 如果 Matching Key 尚未到达任何一个流中,我们应该停止或重试加入,直到预期的 key 到达其他主题。 我们曾考虑将不匹配的记录放回 KStream,但在这种情况下无法保证顺序。

问题 1: 如何停止或保持连接,直到预期的键到达其他主题。 例如,KTable 有 Key 100,但是 KStream 还没有收到 Key 100 那么我们应该重试 Join 或保持 KStream 直到 Key 100 到达。

问题 2: 有没有办法在KStream(Delayed KStream)中放置延迟或间隔来接收延迟时间或间隔的消息。

此外,我们必须从非键控主题构建键控 KStream(将通过从消息 - JSON 中提取来设置键)

Java 更可取,因为我们在 KTable 和 KStream 之间进行了 POC 连接

KTable<String, String> leftStream = builder.table("stream1");
KStream<String, String> rightStream = builder.stream("stream2");
KStream<String, String> outstream = rightStream.leftJoin(leftStream, (orig_msg, description) -> {
         String new_msg = "";
            if (description != null) {
                  new_msg = orig_msg+"-->Matched--"+description;
            }else {
                new_msg = orig_msg+"-->UnMatched<--"+description;
            }
                return new_msg;
     });

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    在您的示例中,您正在执行 KStream-to-KTable LEFT join。 Kafka Streams join semantics 指定 (a) 只有到达 KStream 的数据才会触发连接输出,并且 (b) 如果当新的 KStream 事件到达时,KTable 中没有匹配的数据(连接的右侧),则仍将是立即生成的连接输出,但表端数据使用null(即,不会等待数据到达 KTable 端)。

    问题1:如何停止或保持连接,直到预期的键到达其他主题。例如,KTable 有 Key 100,但是 KStream 还没有收到 Key 100 那么我们应该重试 Join 或保持 KStream 直到 Key 100 到达。

    首先,您不能使用内置的 Kafka Streams 功能停止或保持连接。

    其次,您提供的具体示例在实践中不会发生,因为(见上文)到达 KTable 的事件不会产生连接输出。只有当事件到达 KStream 时,才会 (a) 查找 KTable 并 (b) 生成连接输出,而不管 (a) 的结果如何。

    但是在 KStream-KTable LEFT join 中可能会发生相反的情况:KStream 有 key 100,但 KTable 还没有收到 key 100。如何处理这个问题?见下文。

    问题2:有什么办法可以在KStream(Delayed KStream)中加入Delay或者Interval来接收延迟时间或者间隔的消息。

    是的,有办法做到这一点。但不适用于 Kafka Streams DSL 中现有的连接操作。

    相反,您可以使用 Kafka Streams 的处理器 API 通过几行代码来实现您需要的连接语义,然后将此功能插入 DSL 以便于重用。

    有一个示例应用程序可以证明这一点,巧合的是,与您上面的用例类似:请参阅CustomStreamTableJoinhttps://github.com/confluentinc/kafka-streams-examplesdirect link to CustomStreamTableJoin example for Confluent v5.2.1 / Apache Kafka 2.2)。

    【讨论】:

      【解决方案2】:

      谢谢 Micheal,当我尝试上面的 Stream Example 时,我遇到了以下问题并且 KStream 立即关闭......(我用单个分区创建的所有主题).. 另外,我必须深入研究 KStream 内部结构 :-)..

      stream-client [custom-join-integration-test-4af19e3b-8773-4e75-814e-56ea37839a59] 从 REBALANCING 到 PENDING_SHUTDOWN 的状态转换 (org.apache.kafka.streams.KafkaStreams)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-09-18
        • 2020-04-21
        • 2020-05-02
        • 2022-10-24
        • 2018-02-23
        • 2019-09-28
        • 2018-08-30
        相关资源
        最近更新 更多