【问题标题】:(Kafka) Streams - foreach inside foreach(Kafka) Streams - foreach 里面的 foreach
【发布时间】:2018-03-22 09:01:39
【问题描述】:

我有一段代码遍历 KStream 并检查是否满足条件。在这种情况下,它会调用另一个方法来做一些其他的处理。代码如下所示:

stream1.foreach((k, v) -> {
        if (someCondition) {
            System.out.println("Triggered Join");
            joinStreams();
        }
    }
});

现在,joinStreams() 方法的主体如下所示(仅用于测试目的)。

private static void joinStreams() {
    System.out.println("Started Join");
    stream2.foreach((k, v) -> System.out.println("OK"));
}

joinStreams() 被调用时,它只打印“Started Join”并永远挂起。当我从main() 直接调用它时,它会打印“Started Join”,然后打印与流中的消息一样多的“OK”(这是它的正常行为)。 我的问题是:什么可能导致这种奇怪的结果? P.S:据我所知,问题出在 foreach (来自joinStreams())(来自stream1)中。

【问题讨论】:

    标签: stream apache-kafka-streams


    【解决方案1】:

    你不能那样做。

    通过调用 stream1.foreach 之类的东西,您正在构建流拓扑。构建整个拓扑后,您可以使用 new KafkaStreams(topology, streamingConfig).start() 之类的内容开始它。

    您必须了解foreach 的主体只会在您的流处理拓扑正在执行(正在运行)时被调用。因此,在您的代码中,对 stream2.foreach 的调用(用于构建拓扑)发生在构建并启动拓扑之后,这没有任何意义。

    【讨论】:

    • 是的,解决了它。我错过了有关拓扑的这些详细信息。谢谢!
    猜你喜欢
    • 1970-01-01
    • 2021-07-30
    • 1970-01-01
    • 2021-05-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多