【问题标题】:Does the Kafka Streams StreamBuilder always detect "duplicate" input topics?Kafka Streams StreamBuilder 是否总是检测“重复”输入主题?
【发布时间】:2022-01-28 01:17:17
【问题描述】:

这段代码创建了两个KStream实例分别,它们都从相同的主题中读取:

    final KStream<String, String> inputStream1 =
      builder.stream(INPUT_TOPIC, consumed);
    final KStream<String, String> inputStream2 =
      builder.stream(INPUT_TOPIC, consumed);

    final KStream<String, String> mappedStream1 = inputStream1
            .peek((k, v) -> System.out.println("1: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toLowerCase);

    final KStream<String, String> mappedStream2 = inputStream2
            .peek((k, v) -> System.out.println("2: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toUpperCase);

    mappedStream1.to(OUTPUT_TOPIC_1, produced);
    mappedStream2.to(OUTPUT_TOPIC_2, produced);

拓扑如下所示:只有一个 source 定义,然后使用 两次

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
    Processor: KSTREAM-PEEK-0000000002 (stores: [])
      --> KSTREAM-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000004 (stores: [])
      --> KSTREAM-MAPVALUES-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000002
    Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-PEEK-0000000004
    Sink: KSTREAM-SINK-0000000006 (topic: output-1)
      <-- KSTREAM-MAPVALUES-0000000003
    Sink: KSTREAM-SINK-0000000007 (topic: output-2)
      <-- KSTREAM-MAPVALUES-0000000005

现在我的问题是:假设StreamBuilder 只创建一个源(= 同一主题只创建一个消费者)总是安全的吗?

换句话说:是否总是保证 - 给定具有多个分区的主题 - inputStream1inputStream2 看到相同的记录?

或者最好将其重写为这样的东西,使其明确:

    final KStream<String, String> inputStream =
      builder.stream(INPUT_TOPIC, consumed);

    final KStream<String, String> mappedStream1 = inputStream
            .peek((k, v) -> System.out.println("1: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toLowerCase);

    final KStream<String, String> mappedStream2 = inputStream
            .peek((k, v) -> System.out.println("2: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toUpperCase);

更新

第二个版本的结果是这样的拓扑:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000003 (stores: [])
      --> KSTREAM-MAPVALUES-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-PEEK-0000000001
    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: output-1)
      <-- KSTREAM-MAPVALUES-0000000002
    Sink: KSTREAM-SINK-0000000006 (topic: output-2)
      <-- KSTREAM-MAPVALUES-0000000004

【问题讨论】:

  • 嗯,拓扑是一样的吗?
  • @OneCricketeer 我添加了第二个拓扑。编号不同,但在结构上似乎是相等的。
  • 你在哪里定义builder?两个配置的application.id 是否相同?如果是这样,那么是的,每个输入分区只有一个消费者实例可以为任一流运行,根据底层消费者 api 的限制
  • @OneCricketeer builderapplication.id 相同。您可以发布您的评论作为答案,可能包括您的陈述所依据的参考。谢谢!

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

builder 将是相同的,相同的 application.id

不能代表拓扑,但考虑到消费者 API 级别的流程,group.id 是基于 application.id 构建的,因此您的消费者组对于两个流来说是相同的。

对于一个输入主题,只有一个消费者实例(介于两者之间)能够从该输入主题消费。

这可以解释为什么只有一个来源;因此,您不需要使用相同参数进行额外的builder.stream() 调用。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-07-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多