【问题标题】:How to create a new consumer group in kafka如何在kafka中创建一个新的消费者组
【发布时间】:2021-01-23 15:01:26
【问题描述】:

我正在按照快速入门指南here 上的说明在本地运行 kafka,

然后我在config/consumer.properties 中定义了我的消费者组配置,以便我的消费者可以从定义的group.id 中选择消息

运行以下命令,

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

结果,

test-consumer-group  <-- group.id defined in conf/consumer.properties
console-consumer-67807 <-- when connecting to kafka via kafka-console-consumer.sh

我能够通过基于 python 的消费者连接到 kafka,该消费者配置为使用提供 group.idtest-consumer-group

首先,我无法理解 kafka 如何/何时创建消费者群体。似乎它在某个时间点加载了conf/consumer.properties,另外它在通过kafka-console-consumer.sh 连接时隐式创建了消费者组(在我的情况下为console-consumer-67807)。

我如何明确创建自己的消费者组,比如my-created-consumer-group

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    您没有显式创建消费者组,而是构建始终属于一个消费者组的消费者。无论您使用哪种技术(Spark、Spring、Flink 等),每个 Kafka 消费者都将拥有一个消费者组。消费者组可针对每个消费者进行配置。

    它似乎在某个时间点加载了 conf/consumer.properties,另外它在通过 kafka-console-consumer.sh 连接时隐式创建了消费者组(在我的情况下为 console-consumer-67807)

    如果您不告诉您的控制台使用者实际使用该文件,则不会考虑该文件。

    有以下替代方法来提供消费者组的名称:

    使用属性文件 (--consumer.config) 控制台消费者

    这就是文件config/consumer.properties 的样子

    # consumer group id
    group.id=my-created-consumer-group
    

    这就是你如何确保控制台消费者考虑到这个group.id

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config /path/to/config/consumer.properties
    

    使用 --group 控制台消费者

    对于控制台消费者,使用前缀“console-consumer”和类似 PID 的后缀自动创建消费者组,除非您通过添加 --group 来提供自己的消费者组:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --group my-created-consumer-group
    

    标准的基于代码的消费者 API

    当使用标准 JAVA/Scala/...消费者 API 时,您可以通过属性提供消费者组:

    Properties settings = new Properties();
    settings.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer");
    // set more properties
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
    consumer.subscribe(Arrays.asList("test-topic")
    

    【讨论】:

    • 如果我还没有创建my-created-consumer-group,上述 cmd 是否可以工作。换句话说,如果消费者与任何随机 group.id 连接,kafka 会接受它创建该消费者组吗?
    • Yes and yes :) 如果具有随机 group.id 的消费者第一次消费消息,Kafka 会检查该消费者组是否已经存在。如果此消费者组之前不存在,消费者将从头开始阅读主题(除非另有说明)。
    • 控制台消费者正是这样做的:创建随机 group.id 并能够读取数据,
    • 您可以仔细查看here 以了解更多关于Kafka 中Consumer Groups 的概念。因为这在使用 Kafka 时非常重要,所以我强烈建议对它有一个很好的理解!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多