【问题标题】:Why my Kafka consumers with same group id are not being balanced?为什么我的具有相同组 id 的 Kafka 消费者不平衡?
【发布时间】:2016-05-10 20:14:10
【问题描述】:

我正在编写一个概念验证应用程序来使用来自 Apache Kafka 0.9.0.0 的消息,看看我是否可以使用它来代替常见的 JMS 消息代理,因为 Kafka 提供了一些好处。这是我的基本代码,使用新的消费者 API:

public class Main implements Runnable {

    public static final long DEFAULT_POLL_TIME = 300;
    public static final String DEFAULT_GROUP_ID = "ltmjTest";

    volatile boolean keepRunning = true;
    private KafkaConsumer<String, Object> consumer;
    private String servers;
    private String groupId = DEFAULT_GROUP_ID;
    private long pollTime = DEFAULT_POLL_TIME;
    private String[] topics;

    public Main() {
    }

    //getters and setters...

    public void createConsumer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configs.put("enable.auto.commit", "true");
        configs.put("auto.commit.interval.ms", "1000");
        configs.put("session.timeout.ms", "30000");

        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(asList(topics));
    }

    public static void main(String[] args) {
        Main main = new Main();
        if (args != null && args.length > 0) {
            for (String arg : args) {
                String[] realArg = arg.trim().split("=", 2);
                String argKey = realArg[0].toLowerCase();
                String argValue = realArg[1];
                switch (argKey) {
                case "polltime":
                    main.setPollTime(Long.parseLong(argValue));
                    break;
                case "groupid":
                    main.setGroupId(argValue);
                    break;
                case "servers":
                    main.setServers(argValue);
                    break;
                case "topics":
                    main.setTopics(argValue.split(","));
                    break;
            }
        }
        main.createConsumer();
        new Thread(main).start();
        try (Scanner scanner = new Scanner(System.in)) {
            while(true) {
                String line = scanner.nextLine();
                if (line.equals("stop")) {
                    main.setKeepRunning(false);
                    break;
                }
            }
        }
    }
}

我已经使用默认设置启动了一个 kafka 服务器,并使用 shell 工具 kafka-console-producer.sh 启动了一个 kafka 生产者向我的主题写入消息。然后我使用此代码与两个消费者连接,发送正确的服务器进行连接和订阅主题,其他所有内容都使用默认值,这意味着两个消费者具有相同的组 ID。我注意到只有我的一个消费者使用了所有数据。我读过默认行为应该是消费者必须由服务器平衡,来自official tutorial

如果所有消费者实例都具有相同的消费者组,那么这就像传统的队列平衡消费者负载一样。

如何修复消费者的行为,使其表现得像默认值?还是我遗漏了什么?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    有一个特性 kafka.consumer.PartitionAssignor 说明应该如何为每个消费者分配分区。它有两个实现:RoundRobinAssignor 和 RangeAssignor。默认值为 RangeAssignor。

    可以通过设置参数“partition.assignment.strategy”来改变。

    Round Robin 文档:

    循环分配器布置所有可用分区和所有可用消费者。然后它继续执行从分区到消费者的循环分配。如果所有消费者实例的订阅相同,则分区将均匀分布。 (即,所有消费者的分区所有权计数将在恰好为 1 的增量内。)例如,假设有两个消费者 C0 和 C1,两个主题 t0 ​​和 t1,每个主题有 3 个分区,导致分区 t0p0, t0p1、t0p2、t1p0、t1p1 和 t1p2。分配将是: C0: [t0p0, t0p2, t1p1] C1: [t0p1, t1p0, t1p2]

    范围分配器文档

    范围分配器在每个主题的基础上工作。对于每个主题,我们按数字顺序排列可用分区,并按字典顺序排列消费者。然后,我们将分区数除以消费者总数,以确定分配给每个消费者的分区数。如果不均匀划分,那么前几个消费者将有一个额外的分区。例如,假设有两个消费者 C0 和 C1,两个主题 t0 ​​和 t1,每个主题有 3 个分区,产生分区 t0p0、t0p1、t0p2、t1p0、t1p1 和 t1p2。分配将是: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]

    所以,如果我们所有的主题都只有一个分区,那么只有一个消费者可以工作

    【讨论】:

      猜你喜欢
      • 2017-06-19
      • 2020-06-30
      • 1970-01-01
      • 1970-01-01
      • 2019-03-19
      • 1970-01-01
      • 2021-12-04
      • 2020-08-04
      • 1970-01-01
      相关资源
      最近更新 更多