【问题标题】:Kafka is not assigning a partition after Consumer.Poll(Duration.ZERO);Kafka 在 Consumer.Poll(Duration.ZERO) 之后没有分配分区;
【发布时间】:2021-09-15 20:05:48
【问题描述】:

我开始了一个实施 appache kafka 的项目。
我已经有一个将数据写入队列的工作生产者。到目前为止,一切都很好。现在我想编写一个读取队列中所有数据的消费者。
也就是对应的代码:
try {
    consumer.subscribe(Collections.singletonList("names"));
    if (startingPoint != null){
        consumer.
        consumer.poll(Duration.ofMillis(0));
        consumer.seekToBeginning(consumer.assignment());
    }
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            keyValuePairs.add(new String[]{record.key(),record.value()});
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

该代码现在无法正常工作。仅使用新记录。 我发现 seekToBeginning() 不起作用,因为在那一刻没有为消费者分配分区。 如果我增加投票的持续时间,它会起作用。另一方面,如果我只是暂停线程,则不会。

有人能解释一下为什么会这样吗?我试图自己找出答案,并且已经阅读了一些关于 Kafka 心跳的内容。但我还没有完全理解到底发生了什么。

【问题讨论】:

    标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    任务需要时间;轮询 0 通常意味着轮询将在它发生之前退出。

    您应该在 subscribe() 方法中添加一个ConsumerRebalanceListener 回调并在onPartitionsAssigned() 中执行搜索。

    编辑

    @SpringBootApplication
    public class So69121558Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So69121558Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
            return args -> {
                template.send("so69121558", "test");
                Consumer<String, String> consumer = cf.createConsumer("group", "");
                consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {
    
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    }
    
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        consumer.seekToBeginning(partitions);
                    }
    
                });
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                records.forEach(System.out::println);
                Thread.sleep(5000);
                consumer.close();
            };
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
        }
    
    }
    

    这里有几个使用 Spring 方式的示例 - 只需将其中之一(或两者)添加到上述类中即可。

    @KafkaListener(id = "so69121558", topics = "so69121558")
    void listen(ConsumerRecord<?, ?> rec) {
        System.out.println(rec);
    }
    
    @KafkaListener(id = "so69121558-1", topics = "so69121558")
    void pojoListen(String in) {
        System.out.println(in);
    }
    

    搜索的方式也有所不同;这是完整的示例:

    @SpringBootApplication
    public class So69121558Application extends AbstractConsumerSeekAware {
    
        public static void main(String[] args) {
            SpringApplication.run(So69121558Application.class, args);
        }
    
        @KafkaListener(id = "so69121558", topics = "so69121558")
        void listen(ConsumerRecord<?, ?> rec) {
            System.out.println(rec);
        }
    
        @KafkaListener(id = "so69121558-1", topics = "so69121558")
        void pojoListen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
        }
    
        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            callback.seekToBeginning(assignments.keySet());
        }
    
    
    }
    

    【讨论】:

    • 谢谢你的回答,我想试试。你能提供一个代码示例或给我一个文档,我可以告诉自己吗?
    • 我添加了一个例子。你为什么用spring-kafka标记这个问题?你没有使用它;您正在直接使用 kafka API。
    • 哦,我尝试了互联网上的一个示例,但将整个内容实现到了 Spring Boot 中。我不知道还会有一个 spring-kafka 图书馆,我现在找到了并会尝试一下。我应该注意哪些差异?我想说谢谢你了。你能告诉我为什么我需要将属性设置为最早吗?
    • spring-kafka 提供了一个更简单的编程模型,对于已经使用过其他消息传递技术(如 RabbitMQ 或 JMS)的 Spring 用户来说,它会很熟悉。 earliest 是消费者首次消费主题/分区时需要的;默认情况下,它是最新的,但在你的情况下,因为我们正在执行seekToBeginning,所以它是多余的;我只是出于习惯这样做;这里不需要。
    • 我添加了 Spring 示例。
    猜你喜欢
    • 2019-08-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-02
    • 2015-11-30
    • 2018-06-21
    • 2017-06-28
    • 2020-12-26
    相关资源
    最近更新 更多