【发布时间】:2019-09-14 05:55:36
【问题描述】:
如何在java中编写kafka消费者而不使用无限循环进行轮询?
我通过使用这个link 作为参考创建了 kafka 消费者。这里在处理传入记录函数时编写了 while(true) 循环,它在其中轮询新事件。如果我在我的项目中使用它,除此之外我无法做任何其他事情。有没有办法避免使用这个无限循环来获取新事件?
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, Event> consumer = createConsumer();
// Subscribe to all partition in that topic. 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList("topic"));
processRecords(consumer);
}
private static KafkaConsumer<String, Event> createConsumer() {
Properties props = new Properties();
String consumeGroup = "group_id";
props.put("group.id", consumeGroup);
props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
props.put("client.id", "clientId");
props.put("security.protocol", "SASL_SSL");
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");
props.put("enable.auto.commit", "true");
// Auto commit interval, kafka would commit offset at this interval.
props.put("auto.commit.interval.ms", "101");
// This is how to control number of records being read in each poll
props.put("max.partition.fetch.bytes", "135");
// Set this if you want to always read from beginning.
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("schema.registry.url", "https://avroregistry.octanner.io");
props.put("key.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("value.deserializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
return new KafkaConsumer<String, Event>(props);
}
private static void processRecords(KafkaConsumer<String, Event> consumer) throws InterruptedException {
while (true) {
ConsumerRecords<String, Event> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
long lastOffset = 0;
for (ConsumerRecord<String, Event> record : records) {
System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the message.
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
}
有人可以帮我修改一下,这样我就可以避免 while(true) 循环并且可以只听我传入的事件吗?
【问题讨论】:
-
你为什么不简单地让一个线程做这件事,另一个线程做你想做的其他事情呢?像这样的处理通常是通过无限循环实现的(可选地带有一些用于关闭系统的退出条件)。
-
轮询意味着无限循环。如果您使用的是 Spring,则可以使用 @KafkaListener,但它也会在内部进行轮询。您应该在不同的线程上进行轮询。您可能需要某种机制来退出循环。
标签: java apache-kafka kafka-consumer-api