【发布时间】:2022-01-28 01:17:17
【问题描述】:
这段代码创建了两个KStream实例分别,它们都从相同的主题中读取:
final KStream<String, String> inputStream1 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> inputStream2 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream1
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream2
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
mappedStream1.to(OUTPUT_TOPIC_1, produced);
mappedStream2.to(OUTPUT_TOPIC_2, produced);
拓扑如下所示:只有一个 source 定义,然后使用 两次:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
Processor: KSTREAM-PEEK-0000000002 (stores: [])
--> KSTREAM-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000002
Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
--> KSTREAM-SINK-0000000007
<-- KSTREAM-PEEK-0000000004
Sink: KSTREAM-SINK-0000000006 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000003
Sink: KSTREAM-SINK-0000000007 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000005
现在我的问题是:假设StreamBuilder 只创建一个源(= 同一主题只创建一个消费者)总是安全的吗?
换句话说:是否总是保证 - 给定具有多个分区的主题 - inputStream1 和 inputStream2 看到相同的记录?
或者最好将其重写为这样的东西,使其明确:
final KStream<String, String> inputStream =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
更新
第二个版本的结果是这样的拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000003 (stores: [])
--> KSTREAM-MAPVALUES-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-PEEK-0000000001
Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000002
Sink: KSTREAM-SINK-0000000006 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000004
【问题讨论】:
-
嗯,拓扑是一样的吗?
-
@OneCricketeer 我添加了第二个拓扑。编号不同,但在结构上似乎是相等的。
-
你在哪里定义
builder?两个配置的application.id是否相同?如果是这样,那么是的,每个输入分区只有一个消费者实例可以为任一流运行,根据底层消费者 api 的限制 -
@OneCricketeer
builder和application.id相同。您可以发布您的评论作为答案,可能包括您的陈述所依据的参考。谢谢!
标签: java apache-kafka apache-kafka-streams