【问题标题】:Multiple Kafka Listeners With Same GroupId All Receiving Message具有相同 GroupId 的多个 Kafka 侦听器都接收消息
【发布时间】:2019-06-11 20:38:37
【问题描述】:

我在我们的 Spring Boot 应用程序中配置了一个 kafka 监听器,如下所示:

@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
    // Code to convert to json string and write to ElasticSearch
}

此应用程序被部署到 3 台服务器上并在其上运行,尽管所有服务器的组 ID 都是 kms,但它们都获得了消息的副本,这意味着我在 Elastic 中获得了 3 条相同的记录。当我在本地运行实例时,会写入 4 个副本。

我通过在写入发生前后检查主题上所有消息的计数来确认生产者仅向主题写入 1 条消息;它只增加 1。我怎样才能防止这种情况发生?

【问题讨论】:

    标签: java spring spring-boot apache-kafka spring-kafka


    【解决方案1】:

    当您像这样手动分配分区时,您负责在实例之间分配分区。

    该组被忽略。

    您必须使用组管理并让 Kafka 为您进行分区分配,或者为每个实例手动分配分区。

    topics = "data.all"代替topicPartitions

    【讨论】:

    • 谢谢,明天我会试一试,但我想当我没有分配时,即使在同一实例中,我也会遇到其他消息重复问题。会告诉你的!
    • 所以稍微好一点...当我运行 4 个实例时,我仍然获得了 2 到 3 个副本(而不是 4 个)。仍然不是应该发生的事情。
    • 没关系,我认为有人正在运行未更新的实例,因为当我停止所有 3 台服务器时,我仍然收到了传递的消息副本。我想这解决了它,谢谢!
    【解决方案2】:

    不手动分配分区会发生什么

    制作方

    • 当生产者发送消息时没有任何策略或指定消息应该发送到哪个分区,然后 kafka 尝试使用循环技术并在所有可用分区中拆分所有消息。
      • 2 个分区中的消息是唯一的,因为建议最多只有 1 个使用者来收听主题的特定分区。

    消费者方面

    • 例如一个主题有 2 个分区
    • 然后一个消费者(比如A)加入消费者组(比如consumer
    • 每当有新的消费者加入并且 2 个分区被分配给 A 时,就会发生分区重新分配,因为我们只有一个消费者组 consumer
    • 现在,消费者 B 尝试加入同一个消费者组 consumer,然后再次进行分区重新分配,A 和 B 都将获得分区来监听消息
    • 由于我们只有 2 个分区,即使我们在同一个消费者组中添加更多消费者,也只有 2 个消费者会监听发送到主题的消息,因为一次只有 2 个消费者可以得到 1-1分割。保持消费者使用的消息的排他性。

    在您的情况下发生的情况是,超过 1 个使用者正在侦听相同的分区,因此所有正在侦听同一使用者组中相同分区的使用者也将接收来自该分区的消息。因此,由于超过 1 个消费者正在监听相同的分区,因此消费者组中消费者之间的互斥性丢失。

    【讨论】:

      猜你喜欢
      • 2020-02-10
      • 2018-04-11
      • 1970-01-01
      • 2011-10-09
      • 2022-12-13
      • 2017-12-17
      • 1970-01-01
      • 1970-01-01
      • 2019-06-17
      相关资源
      最近更新 更多