【发布时间】:2018-07-16 23:22:29
【问题描述】:
我有一个关于管理多个 CG 的问题,创建了三个消费者组,每个 CG 都有自己的 kafka 服务、组 ID 和主题。
现在我正在按预期接收消息,但是,我想知道是否可以创建下一个场景:
创建三个消费者组,但只接收来自一个的消息,暂时让其他人暂停/保持,如果他的kafka服务将下降,消费来自下一个消费者组的消息,与第三个相同。
这是我的代码示例:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){ //3
const options = {
groupId: config.kafka_service[i]['groupId'],
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
if(i > 0){
//pause consumers exept FIRST
customConsumerGroupName.pause();
}
customConsumerGroupName.on('message', (message) => {
console.log(message);
});
customConsumerGroupName.on('error', (error) => {
console.log('consumer group error: ', error);
//HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
//MAYBE consumerGroup.resume(); ???
});
}
}
希望它的声音可以理解,谢谢 :)
【问题讨论】:
-
看起来您正在尝试使用单独的消费者组作为故障保险,但我相信单个消费者组应该做你想做的事。每个组应该有多个消费者,如果一个消费者失败,组中的另一个消费者将从中断的地方继续。除非我误解了你的问题。
-
你做到了,你和@Moonwalkr 都在谈论单个 CG 来处理这种情况,但我没有看到任何示例或教程在单个 CG 中设置不同的数据中心。请问有什么线索吗?
-
要将不同的消费者设置为相同的 CG(跨数据中心),您可以使用 groupId 配置选项。您已将其设置为 'config.kafka_service[i]['groupId']'。无论您在哪里创建消费者,您只需确保它具有相同的 groupId。我不熟悉 Node 的 kafka 包,但我想说你应该坚持创建常规消费者,并为每个消费者分配相同的 groupid。这样,无论托管在哪个服务器上,您都可以让不同的消费者成为同一组的一部分。
-
我刚刚阅读了kafka node api,而ConsumerGroup的名字有点不幸。它是一个 kafka 消费者,而 groupId 选项是控制“实际”kafka 消费者组的东西。因此,只要每个“新 ConsumerGroup”的 groupId 选项相同,您的代码就应该可以正常工作。这样,所有的 ConsumerGroup 都将成为实际 Kafka Consumer 组的一部分,从同一个主题中读取,并确保消息不会发送到崩溃的消费者(Node 用语中的 ConsumerGroup 实例)。
标签: node.js apache-kafka