【问题标题】:KAFKA-NODE: manage multiple consumer groupsKAFKA-NODE:管理多个消费者组
【发布时间】:2018-07-16 23:22:29
【问题描述】:

我有一个关于管理多个 CG 的问题,创建了三个消费者组,每个 CG 都有自己的 kafka 服务、组 ID 和主题。

现在我正在按预期接收消息,但是,我想知道是否可以创建下一个场景:

创建三个消费者组,但只接收来自一个的消息,暂时让其他人暂停/保持,如果他的kafka服务将下降,消费来自下一个消费者组的消息,与第三个相同。

这是我的代码示例:

function createConsumerGroup(topics){

    const ConsumerGroup = kafka.ConsumerGroup;

    //CREATE CONSUMER GROUPS FOR EVERY SERVICE
    for(let i = 0; i < config.kafka_service.length ;i++){  //3

        const options = {
            groupId: config.kafka_service[i]['groupId'],
            host: config.kafka_service[i]['zookeeperHost'],
            kafkaHost: config.kafka_service[i]['kafkaHost'],
            sessionTimeout: 15000,
            protocol: ['roundrobin'],
            fromOffset: 'latest'
        }

        //assign all services CG names and create [i] consumer groups!
        let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

        customConsumerGroupName = new ConsumerGroup(options, topics);

        customConsumerGroupName.on('connect', (resp) => {
            console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
        });

        if(i > 0){
            //pause consumers exept FIRST
            customConsumerGroupName.pause();
        }


        customConsumerGroupName.on('message', (message) => {
           console.log(message);
        });

        customConsumerGroupName.on('error', (error) => {
            console.log('consumer group error: ', error);

           //HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
           //MAYBE consumerGroup.resume(); ???
        });

    }
}

希望它的声音可以理解,谢谢 :)

【问题讨论】:

  • 看起来您正在尝试使用单独的消费者组作为故障保险,但我相信单个消费者组应该做你想做的事。每个组应该有多个消费者,如果一个消费者失败,组中的另一个消费者将从中断的地方继续。除非我误解了你的问题。
  • 你做到了,你和@Moonwalkr 都在谈论单个 CG 来处理这种情况,但我没有看到任何示例或教程在单个 CG 中设置不同的数据中心。请问有什么线索吗?
  • 要将不同的消费者设置为相同的 CG(跨数据中心),您可以使用 groupId 配置选项。您已将其设置为 'config.kafka_service[i]['groupId']'。无论您在哪里创建消费者,您只需确保它具有相同的 groupId。我不熟悉 Node 的 kafka 包,但我想说你应该坚持创建常规消费者,并为每个消费者分配相同的 groupid。这样,无论托管在哪个服务器上,您都可以让不同的消费者成为同一组的一部分。
  • 我刚刚阅读了kafka node api,而ConsumerGroup的名字有点不幸。它是一个 kafka 消费者,而 groupId 选项是控制“实际”kafka 消费者组的东西。因此,只要每个“新 ConsumerGroup”的 groupId 选项相同,您的代码就应该可以正常工作。这样,所有的 ConsumerGroup 都将成为实际 Kafka Consumer 组的一部分,从同一个主题中读取,并确保消息不会发送到崩溃的消费者(Node 用语中的 ConsumerGroup 实例)。

标签: node.js apache-kafka


【解决方案1】:

所以看起来混淆是因为 Node 包的“ConsumerGroup”的名称。在 Kafka 术语中,消费者组仅由每个消费者使用的 groupId 控制。具有相同 groupId 的消费者不会收到重复消息,每个主题消息仅由单个消费者读取。如果消费者出现故障,kafka 会检测到这一点并将其分区提供给单独的消费者。

节点“ConsumerGroup”实际上只是另一个 Kafka 消费者(新的消费者组由 Kafka 管理,而不是 Kafka >0.9 的 zookeeper)。

因此,通过 Node ConsumerGroup 来利用 kafka 消费者组的方法如下:

function createConsumerGroup(topics){

const ConsumerGroup = kafka.ConsumerGroup;

//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){  //3

    const options = {
        groupId: 'SOME_GROUP_NAME',
        host: config.kafka_service[i]['zookeeperHost'],
        kafkaHost: config.kafka_service[i]['kafkaHost'],
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'latest'
    }

    //assign all services CG names and create [i] consumer groups!
    let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

    customConsumerGroupName = new ConsumerGroup(options, topics);

    customConsumerGroupName.on('connect', (resp) => {
        console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
    });

    customConsumerGroupName.on('message', (message) => {
       console.log(message);
    });

    customConsumerGroupName.on('error', (error) => {
        console.log('consumer group error: ', error);

       //Error handling logic here, restart the consumer that failed perhaps? 
       //Depends on how you want to managed failed consumers.
    });
  }
}

Nodes ConsumerGroup 的每个实例都将是组“SOME_GROUP_NAME”的成员,并且使用相同 groupId 创建的任何其他消费者也将充当同一个 kafka 消费者组的成员,无论服务器如何等等。

【讨论】:

  • 非常感谢,好点,我做了一些错误处理,主要是关于代理失败和超时,正如你提到的。下一个消费者会失败一个地方。干杯!
  • 一个分区上可以有 3 个消费者组成一个组,以确保如果消费者失败,消息被处理,但没有 2 个消费者收到相同的消息?
【解决方案2】:

消费者群体解决了两个核心场景:

1.缩放 您可以增加组中的消费者数量,以处理该组正在消费(向外扩展)的主题中不断增加的消息生成率

2。故障转移 通过让一组消费者阅读相同的主题,他们将自动处理一个或多个消费者宕机的情况。

因此,您不必拥有“备用”消费者组,您必须自己处理哪些是活跃的,您只需依靠 Kafka 的内置故障转移。消费者可以在多个不同的容器中运行(甚至在不同的数据中心),并且 Kafka 将自动确保将消息传递给各个消费者,无论他们在哪里或在任何给定时间有多少人在运行。

【讨论】:

  • 如果您运行 3 个与上述节点文件相同的实例(具有 consumerGroup),每个实例都会收到相同的数据,实际上会创建副本。假设如果属于同一个 groupId ,只有一个消费者能够处理数据。
  • 你说一个主题可以有多个消费者在一个组中,但这是否适用于单个分区?听起来好像在单个分区上,kafka 不会仅将消息分发给一个组,或者更确切地说,一个组中的任何超过 2 个消费者将在一个分区上处于空闲状态。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-06
  • 2019-04-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多