【问题标题】:Consumer assignment with multiple topics with Kafka Streams使用 Kafka Streams 分配多个主题的消费者
【发布时间】:2020-09-19 21:56:32
【问题描述】:

抱歉,如果这里之前已经介绍过,我找不到任何密切相关的内容。我有这个 Kafka Streams 应用程序,它从多个主题中读取数据,将记录保存在数据库中,然后将事件发布到输出主题。非常简单,就 kafka 本地商店而言,它是无国籍的。 (如下拓扑)

Topic1(T1) 有 5 个分区,Topic2(T2) 有一个分区。这里的问题是,在消费两个主题时,如果我想用 T1(5 个消费者)“全速”运行,它不能保证我将为 T1 上的每个分区都有专门的消费者。它将分布在两个主题分区中,我最终可能会遇到不平衡的消费者(和空闲消费者),如下所示:

  • [c1: t1p1, t1p3], [c2: t1p2, t1p5], [c3: t1p4, t2p1], [c4: (idle consumer)], [c5: (idle consumer)]
  • [c1: t1p1, t1p2], [c2: t1p5], [c3: t1p4, t2p1], [c4: (空闲消费者)], [c5: t1p3]

话虽如此:

  1. 从同一个 KafkaStreams 实例中的多个主题读取的拓扑是否是一种好习惯?

  2. 如果我想在 T1 上“全速”运行,有什么方法可以实现如下分区分配? [c1: t1p1, t2p1], [c2: t1p2], [c3: t1p3], [c4: t1p4], [c5: t1p5]

  3. 以下哪种拓扑最适合我想要实现的目标?还是完全不相关?

选项 A(当前拓扑)

Topologies:
   Sub-topology: 0
    Source: topic1-source (topics: [TOPIC1])
      --> topic1-processor
    Processor: topic1-processor (stores: [])
      --> topic1-sink
      <-- topic1-source
    Sink: topic1-sink (topic: OUTPUT-TOPIC)
      <-- topic1-processor

  Sub-topology: 1
    Source: topic2-source (topics: [TOPIC2])
      --> topic2-processor
    Processor: topic2-processor (stores: [])
      --> topic2-sink
      <-- topic2-source
    Sink: topic2-sink (topic: OUTPUT-TOPIC)
      <-- topic2-processor

选项 B:

Topologies:
   Sub-topology: 0
    Source: topic1-source (topics: [TOPIC1])
      --> topic1-processor
    Source: topic2-source (topics: [TOPIC2])
      --> topic2-processor
    Processor: topic1-processor (stores: [])
      --> response-sink
      <-- topic1-source
    Processor: topic2-processor (stores: [])
      --> response-sink
      <-- topic2-source
    Sink: response-sink (topic: OUTPUT-TOPIC)
      <-- topic2-processor, topic1-processor
  1. 如果我为每个主题使用两个流,而不是一个具有多个主题的流,这对我想要实现的目标有用吗?
config1.put("application.id", "app1");
KakfaStreams stream1 = new KafkaStreams(config1, topologyTopic1);
stream1.start();

config2.put("application.id", "app2");
KakfaStreams stream2 = new KafkaStreams(config2, topologyTopic2);
stream2.start();

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    您描述的初始分配永远不会发生在 Kafka Streams 中(也不会发生在任何默认的消费者配置中)。如果有 5 个分区并且您有 5 个消费者,则每个消费者将获得 1 个分区(对于具有自定义 PartitionAssignor 的普通消费者,您可以以不同的方式进行分配,但所有默认实现都将确保适当的负载平衡)。

    从同一个 KafkaStreams 实例中的多个主题读取拓扑是否是一种好习惯?

    这没有问题。

    如果我想为 T1 “全速”运行,有什么方法可以实现如下分区分配? [c1: t1p1, t2p1], [c2: t1p2], [c3: t1p3], [c4: t1p4], [c5: t1p5]

    根据您编写拓扑的方式,这将是 Kafka Streams 开箱即用的分配。对于您的两个选项,选项 B 将导致此分配。

    以下哪种拓扑最适合我想要实现的目标?还是完全不相关?

    如上所述,选项 B 将导致上述分配。对于选项 A,您实际上甚至可以使用第 6 个实例,并且每个实例将只处理一个分区(因为有两个子拓扑,您将获得 6 个任务,5 个用于 sub-topology-0,1 个用于 sub-topology-1;子拓扑相互独立地向外扩展);对于选项 A,您只能获得 5 个任务,因为只有一个子拓扑,因此两个输入主题的最大分区数(即 5)决定了任务数。

    如果我为每个主题使用两个流而不是具有多个主题的单个流,这是否适用于我想要实现的目标?

    是的,它与选项 A 基本相同——但是,您有两个消费者组,因此“两个应用程序”而不是一个。

    【讨论】:

    • 非常感谢,马蒂亚斯!一些注意事项: 1. 我做了一些测试,将 T1 分区计数扩展到 15 个,T2 扩展到 5 个。此外,我确保 T1 在其 15 个分区上有事件,而对于 T2,只有一个分区。我最终得到了 5 个活跃的消费者和 10 个空闲的消费者(容器)。我在这里错过了什么?
    • 2.正如你所提到的,两个流工作很长时间,我最终得到了我想要的任务。将尝试将拓扑更改为选项 B。再次感谢您的时间! :)
    • 不确定。对于选项 A,应该有 20 个任务(您应该能够从 log4j 输出中确认),并且这些任务应该分布在所有消费者身上。你所说的“空闲”到底是什么意思?没有分配任务/分区?
    • 我的立场是正确的:最终有 4 个活跃消费者。以下是我的结果:c1=[0_3, 1_0] / c2=[0_10, 0_13, 0_14, 0_4, 0_6] / c3=[0_1, 0_2, 0_5, 0_8] / c4=[0_0, 0_11, 0_12, 0_7, 0_9]。我看到的方式(并且基于你上面描述的 - 让我诚实),这是因为我使用选项 A 作为拓扑(如上所述)。如果我把它切换到选项 B,我最终会得到一个 1:1 的分配 + 用于 T1。
    • 所以任务似乎是正确的。子拓扑 1 的单个分区主题有 1_0,子拓扑 0 有 15 个任务(命名为 &lt;subTopology&gt;_&lt;partition&gt;)。 -- 如果你选择选项 B,将有 15 个任务,任务0_0 处理两个输入主题的分区 0。
    猜你喜欢
    • 2017-01-26
    • 1970-01-01
    • 2022-06-14
    • 2018-06-08
    • 1970-01-01
    • 2018-12-31
    • 2017-10-17
    • 1970-01-01
    • 2018-08-31
    相关资源
    最近更新 更多