【发布时间】:2018-07-04 04:40:51
【问题描述】:
假设我有两种类型的日志,它们有一个共同的字段'uid',如果这两个包含uid的日志的日志都到了,我想输出日志,就像一个连接一样,卡夫卡是否可以?
【问题讨论】:
-
我更新了我的答案,加入了关于确定性的说明。
标签: apache-kafka apache-kafka-streams
假设我有两种类型的日志,它们有一个共同的字段'uid',如果这两个包含uid的日志的日志都到了,我想输出日志,就像一个连接一样,卡夫卡是否可以?
【问题讨论】:
标签: apache-kafka apache-kafka-streams
是的,当然。查看 Kafka Streams,特别是 DSL API。它是这样的:
StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], Foo> fooStream = builder.stream("foo");
KStream<byte[], Bar> barStream = builder.stream("bar");
fooStream.join(barStream,
(foo, bar) -> {
foo.baz = bar.baz;
return foo;
},
JoinWindows.of(1000))
.to("buzz");
这个简单的应用程序使用两个输入主题(“foo”和“bar”),将它们连接起来并将它们写入主题“buzz”。由于流是无限的,当加入两个流时,您需要指定一个加入窗口(1000 毫秒以上),这是各个流上的两条消息之间的相对时间差,以使它们有资格加入。
这里是文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html。您会发现可以执行多种不同类型的连接:
需要注意的是,尽管上面的示例将确定性地同步流——如果你重置并重新处理拓扑,每次都会得到相同的结果——但并非 Kafka Streams 中的所有连接操作都是确定性的。在 1.0.0 及之前的版本中,大约有一半是不确定的,可能取决于从底层主题分区消耗的数据的顺序。具体来说,内部KStream-KStream 和所有KTable-KTable 连接都是确定性的。其他连接,如所有KStream-KTable 连接和左/外部KStream-KStream 连接是不确定的,取决于消费者使用的数据顺序。如果您将拓扑设计为可重新处理,请记住这一点。如果您使用这些非确定性操作,当您的拓扑实时运行时,事件到达时的顺序将产生一个结果,但如果您正在重新处理您的拓扑,您可能会得到另一个结果。还要注意像KStream#merge() 这样的操作也不会产生确定性的结果。有关此问题的更多信息,请参阅Why does my Kafka Streams topology does not replay/reprocess correctly? 和此mailing list post
【讨论】: