【问题标题】:node kafka consumer group not receiving message when broker down代理关闭时节点kafka消费者组未收到消息
【发布时间】:2019-05-23 12:14:45
【问题描述】:

我在 kafka 中有一个主题为 internal。我使用以下命令创建了主题

/opt/kafka/bin/kafka-topics.sh 
     --create --zookeeper zookeeper:2181 
     --replication-factor 3 -partitions 6 
     --topic internal

我需要消耗三个不同节点服务器中的所有消息。所以我使用kafka-node 模块作为具有不同消费者名称的消费者组。我创建了一个名为 group1group2group3 的消费者组名称。

一切正常,我可以消费所有消费者的所有消息。

但是当任何代理关闭时,消费者不会收到任何消息。当我列出所有消费者组时,它没有显示特定的组 ID。

(例如)如果nodeserver 1 关闭,则代理中没有可用的组group1

即使我重新启动节点服务器,它也不会在代理中创建任何组,也不会使用相应节点服务器中的任何消息。但是当broker起来了,重启了node server,它在broker里面创建了一个group,node server就可以接收到消息了。

consumer.js

const options = {
  kafkaHost: process.env.KAFKA_HOST, 
  groupId: group_id, //group1 (or) group2 (or) group3
  autoCommit: true,
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  fromOffset: 'latest',
  outOfRangeOffset: 'earliest',
  migrateHLC: false,
  migrateRolling: true,
  fetchMaxBytes: 1024 * 1024 * 10,
  commitOffsetsOnFirstJoin: true,
  onRebalance: (isAlreadyMember, callback) => {
      log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
      callback();
  }
};

const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);

// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled

// On error receiving message
consumerGroup.on('error', function(err) {
    log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
    log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});

更新 - 1

即使我将offsets.topic.replication.factor 更新为23,我也会遇到同样的问题。当我的任何代理关闭时,相应的节点服务器不会使用该消息。而且当我在代理中显示组列表时,它只显示group2group3。但是当broker1 关闭时,group1 不存在。即使我重新启动节点使用者,group1 也不会被创建。

server.properties

broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false

更新 - 2

当代理关闭时,组协调器将被删除,并且不会自动重新选举。

你们能告诉我我做错了什么吗?或者还有什么我需要更新的吗?

【问题讨论】:

  • 请出示您的 Kafka 服务器属性文件...消费者偏移主题的复制因子是多少?
  • 更新了 server.properties 文件。请检查。
  • 同时显示__consumer_offsets主题的描述
  • __consumer_offsets 主题的复制因子仅为 1
  • 完全正确 - 正如我在回答中提到的,您需要手动增加它以使您的集群具有高可用性

标签: node.js apache-kafka kafka-consumer-api


【解决方案1】:

假设这至少是 Kafka 1.x,则需要对 internal Kafka 主题的 HA 进行一些更改。考虑来自server.properties 的以下 sn-p。复制的默认值设置为 1。在您的情况下,对于 3 个代理,将这些设置为 2 可能是一个不错的起点。

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1

加法

据我了解,每个消费者组都有其组协调员。因此,如果有多个组从一个主题消费,那么该主题可以有多个协调者(不同的代理)。代理可以充当多个消费者组的group coordinator。但是对于一个消费者组来说,只有一个代理充当协调者。对于特定的消费者组,我们可以使用以下命令检查哪个代理是协调者:

./kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group <consumer-group> --state 

如果协调器失败,则选择其他代理作为协调器。故障转移策略在第 10 节here 中有详细说明。

【讨论】:

  • 即使我将 offsets.topic.replication.factor 更新为 2 或 3,我也遇到了同样的问题。当任何代理关闭时,相应的节点服务器不会使用消息
  • 能否请您创建一个具有相同参数的新主题,然后在该主题上进行尝试?
  • 是的,但没有运气
  • 代理是否每个主题只有一个消费者组?或者每个主题可以有多个消费者组?
  • 我明白你的意思。但是,当删除一个代理时,相应的消费者组不会在可用代理中列出。 The group coordinator is not getting elected as other two brokers are still available.
【解决方案2】:

即使我将offsets.topic.replication.factor 更新为 2 或 3,我也会遇到同样的问题。当我的任何代理关闭时,相应的节点服务器不会使用消息

创建偏移主题后,更改此属性不会做任何事情。

如果 设置为 1,那么您现在需要 to manually increase it

【讨论】:

  • 其实我已经把话题删了重新创建了,还是不行。
  • 在测试场景之外,这不是一个好主意,但我假设你做的--replication-factor 超过 1 个?
  • 每当我尝试测试场景时,我都会删除主题并使用此命令重新创建主题/opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 -partitions 6 --topic internal
  • internal??主题必须是__consumer_offsets
猜你喜欢
  • 2014-12-31
  • 2021-06-06
  • 2016-05-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-16
  • 2018-09-12
  • 2017-08-14
相关资源
最近更新 更多