【发布时间】:2020-10-03 05:13:55
【问题描述】:
我用 Java 编写了一个非常简单的代码来读取文件并将这些记录发送到 Kafka 主题。一切都按预期工作。但是,我不想写文件,而是想使用 Kafka 文件连接器。我过去使用 REST proxy(curl) 命令做过,但从未在 java 中尝试过。我需要一些帮助。
我可以看到 Maven 存储库中有 Kafka-connect api,我可以将它添加到我的 pom.xml 文件中。我下一步应该如何将它集成到我的 java 代码中。
我在没有 Kafka 连接的情况下读取文件的代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
import java.util.Scanner;
public class SimpleProducer_ReadFile {
public static void main(String[] args) throws FileNotFoundException {
// System.out.println("Hello Kafka ");
// setting properties
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> produce = new KafkaProducer<String, String>(props);
//reading file
File read = new File("C:\\Users\\\Desktop\\TestFile.txt");
Scanner scan = new Scanner(read);
while(scan.hasNextLine()){
String data = scan.nextLine();
System.out.println(data);
//create the producer record
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic",data);
//send data
produce.send(record);
}
//flush and close
produce.flush();
produce.close();
}
}
【问题讨论】:
标签: java apache-kafka apache-kafka-connect