【发布时间】:2020-02-29 02:06:15
【问题描述】:
我正在使用@KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来监听 3 个 kafka 主题,每个主题有 10 个分区。我对它的工作原理几乎没有疑问。
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(30);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
我的 listener.ackmode 是 return 并且 enable.auto.commit 设置为 false 并且 partition.assignment.strategy: org.apache.kafka.clients .consumer.RoundRobinAssignor
1) 我对并发的理解是,因为我将并发(在工厂级别)设置为 30,并且我总共有 30 个分区(三个主题一起)要读取,每个线程将被分配一个分区.我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发,会有什么影响?
2) spring 调用 poll() 方法时,是否会从所有三个主题中进行轮询?
3) 由于我将 listener.ackmode 设置为返回,它是否会等到在单个 poll() 中返回的所有记录完成后再发出下一个 poll()?另外,如果我的记录处理时间超过 max.poll.interval.ms 会怎样?假设在单个 poll() 调用中返回 1-100 个偏移量,而我的代码在 max.poll.interval.ms 被命中之前只能处理 50 个,此时将发出另一个轮询,因为它已经达到 max.poll .interval.ms?如果是这样,下一个 poll() 会从偏移量 51 返回记录吗?
非常感谢您的时间和帮助
【问题讨论】:
标签: apache-kafka kafka-consumer-api spring-kafka