【问题标题】:Is it possible to consume kafka messages using key and partition?是否可以使用键和分区来消费 kafka 消息?
【发布时间】:2020-03-02 14:41:37
【问题描述】:

我正在使用 kafka_2.12 版本 2.3.0 我使用分区和密钥将数据发布到 kafka 主题。我需要找到一种方法,使用该方法可以使用键和分区组合来使用主题中的特定消息。这样我就不必消耗所有消息并迭代正确的消息。

目前我只能这样做

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
    it -> it.key().equals(key)
}

【问题讨论】:

    标签: java apache-kafka kafka-partition


    【解决方案1】:

    有两种消费topic/partitions的方式是:

    1. KafkaConsumer.assign():Document link
    2. KafkaConsumer.subscribe() : Document link

    所以,你不能通过按键获取消息。

    如果您没有扩展分区的计划,请考虑使用 assign() 方法。因为带有特定键的所有消息都会进入同一个分区。

    使用方法:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    TopicPartition partition = new TopicPartition("some-topic", 0);
    consumer.assign(Arrays.asList(partition));
    
    while(true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        String data = records.findAll {
            it -> it.key().equals(key)
        }
    }
    

    【讨论】:

    • 是否可以在主题中查找特定分区?
    • @AbhishekGharai 是的,Assign() 方法可以手动将分区列表分配给此使用者。
    【解决方案2】:

    您不能“通过密钥从 Kafka 获取消息”。

    如果可行,一个解决方案是拥有与键一样多的分区,并且始终将键的消息路由到同一分区。

    消息键作为分区

    kafkaConsumer.assign(topicPartitions);
        kafkaConsumer.seekToBeginning(topicPartitions);
    
        // Pull records from kafka, keep polling until we get nothing back
        final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
        ConsumerRecords<byte[], byte[]> records;
        do {
            // Grab records from kafka
            records = kafkaConsumer.poll(2000L);
            logger.info("Found {} records in kafka", records.count());
    
            // Add to our array list
            records.forEach(allRecords::add);
    
        }
        while (!records.isEmpty());
    

    仅使用主题名称访问主题的消息

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList(<Topic Name>,<Topic Name>));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-01-27
    • 1970-01-01
    • 1970-01-01
    • 2016-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-13
    相关资源
    最近更新 更多