【发布时间】:2017-09-09 01:15:07
【问题描述】:
我有一个相当简单的 Kafka 设置 - 1 个生产者、1 个主题、10 个分区、10 个 KafkaConsumer,它们都具有相同的组 ID,都在一台机器上运行。当我处理一个文件时,生产者迅速创建了 3269 条消息,消费者很高兴地开始消费这些消息。一段时间内一切都运行良好,但在某个时刻,消费者开始消费重复项——大量重复项。事实上,看起来他们只是重新开始消费消息队列。如果我让它运行很长时间,数据库将开始接收相同的数据条目 6 次或更多次。在对日志进行了一些测试之后,看起来消费者正在重复使用具有相同唯一消息名称的相同消息。
据我所知,没有发生重新平衡。消费者并没有死亡或被添加。它是相同的 10 个消费者,一遍又一遍地使用相同的 3269 条消息,直到我终止该进程。如果我放手,消费者将写入成千上万条记录,从而大量增加真正应该进入数据库的数据量。
我对 Kafka 还很陌生,但我有点不知道为什么会发生这种情况。我知道 Kafka 不能保证只处理一次,我可以在这里和那里复制几个副本。我有代码可以防止再次保留相同的记录。但是,我不确定为什么消费者会一遍又一遍地重新消费队列。我知道 Kafka 消息在被消费后不会被删除,但如果所有消费者都在同一个组中,那么偏移量应该可以防止这种情况发生,对吧?我对偏移量的工作原理有所了解,但据我所知,如果没有重新平衡,它们不应该被重置,对吧?据我所知,这些消息并没有超时。有没有办法让我的消费者一次性消费队列中的所有内容,然后等待更多消息而不会永远重复消费相同的东西?
这是我传递给生产者和消费者的属性:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("group.id", "MyGroup");
props.put("num.partitions", 10);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
MyIngester ingester = new MyIngester(args[0], props);
【问题讨论】:
标签: java multithreading apache-kafka kafka-consumer-api