【发布时间】:2019-05-23 12:14:45
【问题描述】:
我在 kafka 中有一个主题为 internal。我使用以下命令创建了主题
/opt/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper:2181
--replication-factor 3 -partitions 6
--topic internal
我需要消耗三个不同节点服务器中的所有消息。所以我使用kafka-node 模块作为具有不同消费者名称的消费者组。我创建了一个名为 group1、group2、group3 的消费者组名称。
一切正常,我可以消费所有消费者的所有消息。
但是当任何代理关闭时,消费者不会收到任何消息。当我列出所有消费者组时,它没有显示特定的组 ID。
(例如)如果nodeserver 1 关闭,则代理中没有可用的组group1
即使我重新启动节点服务器,它也不会在代理中创建任何组,也不会使用相应节点服务器中的任何消息。但是当broker起来了,重启了node server,它在broker里面创建了一个group,node server就可以接收到消息了。
consumer.js
const options = {
kafkaHost: process.env.KAFKA_HOST,
groupId: group_id, //group1 (or) group2 (or) group3
autoCommit: true,
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
migrateHLC: false,
migrateRolling: true,
fetchMaxBytes: 1024 * 1024 * 10,
commitOffsetsOnFirstJoin: true,
onRebalance: (isAlreadyMember, callback) => {
log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
callback();
}
};
const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);
// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled
// On error receiving message
consumerGroup.on('error', function(err) {
log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});
更新 - 1
即使我将offsets.topic.replication.factor 更新为2 或3,我也会遇到同样的问题。当我的任何代理关闭时,相应的节点服务器不会使用该消息。而且当我在代理中显示组列表时,它只显示group2 和group3。但是当broker1 关闭时,group1 不存在。即使我重新启动节点使用者,group1 也不会被创建。
server.properties
broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false
更新 - 2
当代理关闭时,组协调器将被删除,并且不会自动重新选举。
你们能告诉我我做错了什么吗?或者还有什么我需要更新的吗?
【问题讨论】:
-
请出示您的 Kafka 服务器属性文件...消费者偏移主题的复制因子是多少?
-
更新了 server.properties 文件。请检查。
-
同时显示
__consumer_offsets主题的描述 -
__consumer_offsets主题的复制因子仅为 1 -
完全正确 - 正如我在回答中提到的,您需要手动增加它以使您的集群具有高可用性
标签: node.js apache-kafka kafka-consumer-api