【问题标题】:Read file using kafka file connector in java在java中使用kafka文件连接器读取文件
【发布时间】:2020-10-03 05:13:55
【问题描述】:

我用 Java 编写了一个非常简单的代码来读取文件并将这些记录发送到 Kafka 主题。一切都按预期工作。但是,我不想写文件,而是想使用 Kafka 文件连接器。我过去使用 REST proxy(curl) 命令做过,但从未在 java 中尝试过。我需要一些帮助。

我可以看到 Maven 存储库中有 Kafka-connect api,我可以将它添加到我的 pom.xml 文件中。我下一步应该如何将它集成到我的 java 代码中。

我在没有 Kafka 连接的情况下读取文件的代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Properties;
import java.util.Scanner;

public class SimpleProducer_ReadFile {

    public static void main(String[] args) throws FileNotFoundException {
       // System.out.println("Hello Kafka ");

        // setting properties
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // create the producer
        KafkaProducer<String, String> produce = new KafkaProducer<String, String>(props);


        //reading file
        File read = new File("C:\\Users\\\Desktop\\TestFile.txt");
        Scanner scan = new Scanner(read);
        while(scan.hasNextLine()){

            String data = scan.nextLine();
            System.out.println(data);

            //create the producer record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic",data);

            //send data
            produce.send(record);
        }

        //flush and close
        produce.flush();
        produce.close();
    }
}

【问题讨论】:

    标签: java apache-kafka apache-kafka-connect


    【解决方案1】:

    您只需要Kafka ConnectFileStreamSource 连接器从文件中读取数据并将其发送到Kafka。


    在你的情况下,配置应该是

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/path/to/file.txt
    topic=test-topic
    

    现在等效的curl 命令将是:

    curl -X POST \
      -H "Content-Type: application/json" \
      --data '{"name": "local-file-source", "config": {"connector.class":"FileStreamSource", "tasks.max":"1", "file":"path/to/file.txt", "topics":"test-topic" }}' http://localhost:8083/connectors
    

    如果您想以编程方式执行此操作,只需发送POST 请求即可。

    【讨论】:

      猜你喜欢
      • 2021-04-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-03
      • 2019-08-27
      • 2011-04-20
      • 1970-01-01
      • 2011-01-13
      相关资源
      最近更新 更多