【发布时间】:2019-06-11 20:38:37
【问题描述】:
我在我们的 Spring Boot 应用程序中配置了一个 kafka 监听器,如下所示:
@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
// Code to convert to json string and write to ElasticSearch
}
此应用程序被部署到 3 台服务器上并在其上运行,尽管所有服务器的组 ID 都是 kms,但它们都获得了消息的副本,这意味着我在 Elastic 中获得了 3 条相同的记录。当我在本地运行实例时,会写入 4 个副本。
我通过在写入发生前后检查主题上所有消息的计数来确认生产者仅向主题写入 1 条消息;它只增加 1。我怎样才能防止这种情况发生?
【问题讨论】:
标签: java spring spring-boot apache-kafka spring-kafka