【发布时间】:2018-07-26 19:52:58
【问题描述】:
在 kafka 中,我需要使用 Java 使用来自两个消费者(分区 1 到消费者 1,分区 2 到消费者 2)的两个分区来消费一个主题。
这是我的生产者代码
public class KafkaClientOperationProducer {
KafkaClientOperationConsumer kac = new KafkaClientOperationConsumer();
public void initiateProducer(ClientOperation clientOperation,
ClientOperationManager activityManager,Logger logger) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, ClientOperation> producer = new KafkaProducer<>(props);
try{
ProducerRecord<String, ClientOperation> record = new ProducerRecord<String, ClientOperation>(
topicName, key, clientOperation);
producer.send(record);
}
finally{
producer.flush();
producer.close();
kac.initiateConsumer(activityManager);//Calling Consumer
}
}
}
这是我的消费者代码
public class KafkaClientOperationConsumer{
String topicName = "CA_Topic";
String groupName = "CA_TopicGroup";
public void initiateConsumer(ClientOperationManager activityManager) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
props.put("group.id", groupName);
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String, ClientOperation> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
ConsumerRecords<String, ClientOperation> records = consumer.poll(100);
try{
for (ConsumerRecord<String, ClientOperation> record : records) {
activityManager.save(record.value());//saves data in database
}}
finally{
consumer.close();}
}
}
上面的代码适用于单个消费者而不是多个消费者
clientOperation 是一个保存客户端操作数据的对象。
分区号是三个(您可以从代码中看到),
当我尝试使用线程(即(ExecutorService 执行程序)调用initialConsumer 时,我在数据库中得到重复值
请更改我的代码,以便我可以使用两个消费者使用 CA_Topic,由于内存问题,我无法使用两个 JVM。提前致谢
【问题讨论】:
-
添加更多关于你尝试过的细节。你的问题的目的等等
-
您只需要运行两个具有相同消费者组 ID 的消费者实例。参考任何java消费者代码
-
@Sourav Gulati,这是我的问题,我无法像线程一样并行运行两个 Consumer.java 实例,谢谢您的回复。
-
不是多线程的,只需在两个 JVM 中运行您的消费者应用程序的两个独立实例,它们会自动在它们之间均匀地平衡分区
-
无法解释为什么?因为错误或者你不知道怎么做?请编辑您的问题以包含您迄今为止编写的代码
标签: apache-kafka kafka-consumer-api