【问题标题】:How to make multiple logs sync in kafka?如何在kafka中同步多个日志?
【发布时间】:2018-07-04 04:40:51
【问题描述】:

假设我有两种类型的日志,它们有一个共同的字段'uid',如果这两个包含uid的日志的日志都到了,我想输出日志,就像一个连接一样,卡夫卡是否可以?

【问题讨论】:

  • 我更新了我的答案,加入了关于确定性的说明。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

是的,当然。查看 Kafka Streams,特别是 DSL API。它是这样的:

 StreamsBuilder builder = new StreamsBuilder();

 KStream<byte[], Foo> fooStream = builder.stream("foo");

 KStream<byte[], Bar> barStream = builder.stream("bar");

 fooStream.join(barStream,
                (foo, bar) -> {
                    foo.baz = bar.baz;
                    return foo;
                },
                JoinWindows.of(1000))
          .to("buzz");

这个简单的应用程序使用两个输入主题(“foo”和“bar”),将它们连接起来并将它们写入主题“buzz”。由于流是无限的,当加入两个流时,您需要指定一个加入窗口(1000 毫秒以上),这是各个流上的两条消息之间的相对时间差,以使它们有资格加入。

这里有一个更完整的例子:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

这里是文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html。您会发现可以执行多种不同类型的连接:

需要注意的是,尽管上面的示例将确定性地同步流——如果你重置并重新处理拓扑,每次都会得到相同的结果——但并非 Kafka Streams 中的所有连接操作都是确定性的。在 1.0.0 及之前的版本中,大约有一半是不确定的,可能取决于从底层主题分区消耗的数据的顺序。具体来说,内部KStream-KStream 和所有KTable-KTable 连接都是确定性的。其他连接,如所有KStream-KTable 连接和左/外部KStream-KStream 连接是不确定的,取决于消费者使用的数据顺序。如果您将拓扑设计为可重新处理,请记住这一点。如果您使用这些非确定性操作,当您的拓扑实时运行时,事件到达时的顺序将产生一个结果,但如果您正在重新处理您的拓扑,您可能会得到另一个结果。还要注意像KStream#merge() 这样的操作也不会产生确定性的结果。有关此问题的更多信息,请参阅Why does my Kafka Streams topology does not replay/reprocess correctly? 和此mailing list post

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-14
    • 2018-07-29
    • 2022-08-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多