评论太长了,无法添加。
您在评论中提到“我们需要相同数量的消费者应用程序分区”是正确的。但是,它仅适用于所有消费者(在您的情况下为 5 个)属于同一消费者组的情况。
例如,一个主题 T 有 5 个分区,现在假设我们创建一个消费者 C1 与消费者组 G1强>。消费者c1会从Topic T的所有5个分区中获取消息。然后,我们将消费者c2添加到同一个消费者组G1下。 c1 将消耗 3 个分区,c2 将从剩余 2 个分区消耗(反之亦然)。现在你提到的——“每个消费者应用程序一个分区”是一种理想的场景,在这种情况下,同一消费者组 (G1) 下的 5 个消费者可以从所有 5 个分区并行消费。这个概念称为可扩展性。
现在,在您的情况下,您需要读取 5 次相同的数据,因为您有 5 个消费者。在这种情况下,您可以编写一个简单的生产者应用程序,将数据发布到具有 1 个分区的主题上,而不是将相同的消息发布到 5 个分区,然后消费来自所有 5 个消费者的相同消息。然后,您的 5 个消费者应用程序可以独立使用相同的数据,即我告诉您为所有消费者应用程序分配随机的消费者组名称,以便它独立地使用消息(以及提交偏移量)。
在代码sn-p下面。两个消费者并行消费来自同一个 Topic(1 个分区)的消息:
消费者1:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); // randomise consumer group for consumer 1.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer consumerLiveVideo = new KafkaConsumer(props);
consumerLiveVideo.subscribe(Collections.singletonList(topicName[0])); // topic with 1 partition
消费者2:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); // randomise consumer group for consumer 2 .
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer consumerLiveVideo = new KafkaConsumer(props);
consumerLiveVideo.subscribe(Collections.singletonList(topicName[0])); // topic with 1 partition
您还询问了正确的方法,据我说,您只需要一个消费者应用程序。另外,不要在 Kafka 中混用复制和可扩展性的概念,因为这两者都非常重要。
另外,关于关键数据,你已经说过了,你可以阅读 Producer 配置参数 acks(根据你的场景使用参数 acks =1 或 acks=all)。
有关可扩展性、复制、消费者组、消费者/生产者/经纪人/主题的更多详细信息,请阅读 Kafka The Definitive Guide 的第 1-5 章。