【问题标题】:Spring Kafka polling with @KafkaListener and listener ack-mode set as record使用 @KafkaListener 和侦听器 ack-mode 设置为记录的 Spring Kafka 轮询
【发布时间】:2020-02-29 02:06:15
【问题描述】:

我正在使用@KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来监听 3 个 kafka 主题,每个主题有 10 个分区。我对它的工作原理几乎没有疑问。

    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(30);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
    @KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }

我的 listener.ackmode 是 return 并且 enable.auto.commit 设置为 false 并且 partition.assignment.strategy: org.apache.kafka.clients .consumer.RoundRobinAssignor

1) 我对并发的理解是,因为我将并发(在工厂级别)设置为 30,并且我总共有 30 个分区(三个主题一起)要读取,每个线程将被分配一个分区.我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发,会有什么影响?

2) spring 调用 poll() 方法时,是否会从所有三个主题中进行轮询?

3) 由于我将 listener.ackmode 设置为返回,它是否会等到在单个 poll() 中返回的所有记录完成后再发出下一个 poll()?另外,如果我的记录处理时间超过 max.poll.interval.ms 会怎样?假设在单个 poll() 调用中返回 1-100 个偏移量,而我的代码在 max.poll.interval.ms 被命中之前只能处理 50 个,此时将发出另一个轮询,因为它已经达到 max.poll .interval.ms?如果是这样,下一个 poll() 会从偏移量 51 返回记录吗?

非常感谢您的时间和帮助

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    我的 listener.ackmode 是 return

    没有这样的确认模式;由于您没有在工厂设置它,因此您的实际确认模式是 BATCH (默认)。要使用 ack 模式记录(如果这就是您的意思),您必须如此配置工厂容器属性。

    我对并发的理解是......

    你的理解不正确;并发不能大于分区最多的主题中的分区数(如果一个监听器监听多个主题)。由于每个主题只有 10 个分区,因此您的实际并发数为 10。

    覆盖侦听器上的concurrency 只会覆盖出厂设置;您总是需要至少与并发数量一样多的分区。

    spring 调用 poll() 方法时,是否会从所有三个主题中进行轮询?

    没有那个配置;你有 3 个并发容器,每个容器有 30 个消费者在听一个主题。您有 90 个消费者。

    如果您对所有 3 个主题都有一个侦听器,则投票将返回所有 3 个主题的记录;但是您仍然可能有 20 个空闲消费者,具体取决于分区分配者如何分配分区 - 请参阅“分区分配”日志以了解分区的确切分配方式。循环分配器应该可以分配它们。

    此时将再次发布民意调查

    Spring 无法控制 - 如果您花费的时间过长,消费者线程在侦听器中 - 消费者不是线程安全的,因此我们无法发出异步轮询。

    必须max.poll.interval.ms 内处理 max.poll.records 以避免 Kafka 重新平衡分区。

    ack模式没有区别;这一切都是为了及时处理投票结果。

    【讨论】:

    • 抱歉打错了,我的意思是记录不返回。感谢您的信息。一件事我仍然不清楚 spring 多久调用一次 poll() 方法?它是在处理完所有记录后立即调用还是以固定时间间隔调用(设置为 max.poll.interval.ms)
    • 线程调用poll(),用结果调用监听器(一次一个或整个批次,取决于监听器类型);然后立即再次调用poll()(默认情况下 - 2.3 添加了一个新的容器属性idleBetweenPolls,默认情况下为 0)。
    • 感谢您的回复。当您说同一个线程立即调用 poll() 时,它会不会等待 @KafkaListener 完成并确认(将 ack-mode 类型设置为记录),然后再发出下一次轮询?还有 max.poll.interval.ms 是怎么来这里玩的?
    • 对于RECORD以外的确认模式(例如BATCH),在调用下一个poll()之前,我们提交偏移量;因为syncCommits 默认为真,所以该调用将阻塞,直到 Kafka 响应。然后我们进行下一次投票。但是,对于AckMode.RECORD,每次提交都会在处理完每条记录后立即执行。 max.poll.interval.ms 是 Kafka 允许消费者在考虑消费者已死亡并重新分配分区之前处理来自先前轮询的所有记录的时间。必须在该时间内调用下一个poll()
    • 感谢加里的帮助
    猜你喜欢
    • 1970-01-01
    • 2020-10-03
    • 1970-01-01
    • 2018-07-02
    • 2021-08-09
    • 1970-01-01
    • 1970-01-01
    • 2019-11-25
    • 1970-01-01
    相关资源
    最近更新 更多