【发布时间】:2017-05-13 17:07:07
【问题描述】:
谁能帮我解决以下问题。我正在使用 kafka-clients-0.10.1.1(单节点单代理)
auto.create.topics.enable 的默认值为 true。
1.我正在使用
向主题发送消息 kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
消费:
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
问题是当我第一次运行消费者时,它没有得到值。而且我必须运行生产者并再次运行消费者以获取值。有时我必须运行生产者 3 次。 为什么会这样?
2.) enable.auto.commit=false
如果 enable.auto.commit 属性为 false,同一消费者能否多次阅读消息?
3.) 考虑我在第一点的消费者代码。我如何打破循环我的意思是消费者如何知道它已读取所有消息然后调用 consumer.close()
【问题讨论】:
-
kafka bin里有一个console-consumer,你自己的consumer不能消费数据的时候可以试试。如果可能,请尝试添加 producer.flush() 。对于您的问题 3,流式处理程序无法知道批处理的结束,但您可以设置超时线程来监控超时而不消耗数据。
-
是的,我用 bin 消费者对其进行了测试,它在获取相关 id 为 1 的元数据时出现错误:{my-topic-106=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.网络客户端)
-
您最近是否在消费数据之前生产过数据?默认情况下,Kafka 只会将您的数据存储 3 天。
标签: apache-kafka kafka-consumer-api kafka-producer-api