【问题标题】:Kafka consumer unable to read data on WindowsKafka 消费者无法在 Windows 上读取数据
【发布时间】:2020-05-13 22:16:47
【问题描述】:

我正在尝试编写一个简单的 Kafka 消费者来读取 Windows 机器上的数据。 但我的消费者无法读取任何数据。生产者生成了超过 20 条消息,但没有一条被消费。

以下是我的消费者代码:

 Properties properties= new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my_application");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

    //create the consumer

    KafkaConsumer<String, String> consumer= new KafkaConsumer<String, String>(properties);

    consumer.subscribe(Collections.singleton("first_topic"));

        ConsumerRecords<String, String> record=consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord data: record){
            System.out.println("Key: "+data.key()+" and value: "+data.value());
            System.out.println("Topic: "+data.topic());
            System.out.println("Partition: "+data.partition());
        }

当我使用控制台使用命令来使用消息时发生了同样的问题:

kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first_topic --group my-application

我必须明确使用--partition 0 选项来获取消息。 有什么办法可以解决这个问题吗?

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    在代码中使用以下任何属性-

     `properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    

    它告诉消费者只读取最新的消息,即消费者启动后发布的消息。

    `properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    

    当消费者开始消费分区中的消息时,它将从头开始读取。

    使用以下命令

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    ----group my-application 应该是--group my-application

    https://kafka.apache.org/quickstart

    请按照以下步骤操作。

    一个。获取主题的分区

    ./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic: test PartitionCount: 1   ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    

    b.获取主题的偏移量。

    /kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
    test:0:11
    

    c.消费一个分区的消息

    /kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test   --partition 0 
    

    您还可以指定一个--offset 参数来指示从哪个偏移量开始。如果不存在,则从分区末尾开始消费。

    您还可以使用基于 GUI 的工具来查看名为 kafka 工具的主题的每个分区中的数据。 http://www.kafkatool.com这是一个管理我们kafka集群的工具。

    Java 代码

    TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));
    

    您可以通过以下 URL 获取使用 java 的详细实现。

    https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    https://www.logicbig.com/tutorials/misc/kafka/manually-assign-partition-to-a-consumer.html

    【讨论】:

    • 感谢您的回答。但行为仍然相同。记录数仍然为 0。我已经尝试了两个偏移配置,但都没有成功。
    猜你喜欢
    • 2022-01-20
    • 1970-01-01
    • 2021-04-26
    • 2019-05-14
    • 2019-11-21
    • 1970-01-01
    • 1970-01-01
    • 2017-11-09
    • 2019-03-14
    相关资源
    最近更新 更多