【问题标题】:Read 1 message concurrently from multiple Kafka topics从多个 Kafka 主题中同时读取 1 条消息
【发布时间】:2017-10-31 20:20:54
【问题描述】:

我将 Kafka Listener 的并发设置为 1。

    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> 
    factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(conncurrency);
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());

我正在听 3 个不同的主题

    @KafkaListener(topics = "#{'${kafka.consumer.topic.name}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(@Payload Map<String, Object> conciseMap,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) int offset,
            Acknowledgment ack) {           
        processMessage(conciseMap,partition,offset,ack,false);
    }

在这种情况下,侦听器是否会从第一个主题中读取一条消息,一旦处理完毕,是否会从下一个主题中读取一条消息,依此类推?或者它会同时处理来自每个主题的 1 条消息。

如果是前者,有没有办法在不创建多个监听器的情况下同时从所有主题中读取 1 条消息?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    无法保证 Kafka 代理将如何跨容器线程分配分区;如果你只有一个分区;它们可能都被分配给同一个容器线程。这就是我在容器并发=3 的情况下运行测试时发生的情况...

    2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[]

    2017-10-31 16:40:26.066 INFO 35202 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[]

    2017-10-31 16:40:26.079 INFO 35202 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[bar-0,foo-0,baz-0]

    每个主题有 10 个分区,我得到了这个分布...

    2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-1-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[foo10-5,foo10-6,foo10-4,baz10- 5、baz10-4、baz10-6、bar10-5、bar10-4、bar10-6]

    2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[bar10-1、bar10-0、bar10-3、bar10- 2、baz10-1、baz10-0、baz10-3、baz10-2、foo10-3、foo10-1、foo10-2、foo10-0]

    2017-10-31 16:46:19.279 INFO 35900 --- [ntainer#0-2-C-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[baz10-9,baz10-8,baz10-7,bar10- 9, bar10-8, foo10-9, bar10-7, foo10-7, foo10-8]

    如您所见,每个主题的一些分区被分配给每个线程。但是其中两个线程总共有 9 个分区,而一个有 12 个。

    如果您想要完全控制,我建议每个主题设置一个监听器。

    【讨论】:

    • 谢谢加里!容器线程的数量是如何确定的?如果我将并发设置为 1 ,还会创建 3 个容器线程吗?
    • 否; concurrency=3 表示 3 个线程。它与您收听的主题数量无关。
    • 如果我有 1 个主题和 1 个分区,一次只有 1 个线程处于活动状态吗?
    • 正确,因为一个分区一次只能分配给一个线程。
    • 当我分配 3 个主题并将 1 个分区分配给单个侦听器并将并发设置为 3 时,每个线程是否会处理来自 1 个主题的数据(最有可能的情况)?
    【解决方案2】:

    您不需要创建多个侦听器 - 您只需要与所有主题中提供的分区一样多的并发量,甚至更多。

    会有这么多的KafkaMessageListenerContainer 旋转,它们每个都将在自己的线程中工作。您仍然可以使用相同的 @KafkaListener 方法。只要你在那里是无状态的,你的并发就没有任何问题。

    【讨论】:

    • 感谢@ArtemBilan。假设我只有 1 个分区,如果我将并发设置为 5,是否会同时处理来自单个主题的 5 条消息?或者我应该为此使用批处理侦听器吗?
    • 不,来自一个分区的所有消息都进入同一个消费者线程。这就是 Apache Kafka 的设计方式。
    • 如果我有 5 个分区并将并发设置为 5 ,我猜 5 条消息将由 1 个消费者线程处理。但是,如果我将并发设置为 1 ,那么消息将被一一处理。我的理解正确吗?
    • 查看我关于分区分配如何与concurrency一起工作的答案。
    猜你喜欢
    • 2017-01-24
    • 1970-01-01
    • 2021-03-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-24
    • 1970-01-01
    • 2015-08-01
    • 1970-01-01
    相关资源
    最近更新 更多