【问题标题】:Read CSV file using kafka-connect and sink into ms sql database?使用 kafka-connect 读取 CSV 文件并沉入 ms sql 数据库?
【发布时间】:2020-01-07 00:31:17
【问题描述】:

我正在研究 POC,我必须读取 csv 文件并将其插入 ms sql server。 我已经创建了以下配置但我得到了以下异常:

)\n原因:org.apache.kafka.connect.errors.ConnectException:值模式必须是结构类型

配置如下:

1.示例 csv 文件数据:

id,record1,record2,record3,created

1,1772056014794065487,160842,20668578,9999-12-31

2,1772056014794065487,160842,20668578,9999-12-31

3,1772056014794065487,160842,20668578,9999-12-31

4,1772056014794065487,160842,20668578,9999-12-31

5,1772056014794065487,160842,20668578,9999-12-31

2。文件源连接器

{"name":"file-source",

"config":
         {
          "connector.class":"FileStreamSource",
          "tasks.max":"1",
          "file":"/tmp/my-connect-test.dat",
          "topic":"connect-test",
          "name":"file-source"},
          "tasks":[{"connector":"file-source","task":0}],
          "type":"source"}

3. jdbc-sink-connector:

{"name":"test-sink",
  "config":   {
         "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
         "tasks.max":"1",
         "topics":"connect-test",
         "topic.prefix":"connect-test", 
         "insert.mode":"insert",
         "table.name.format":"dz.temp_data",
         "pk.mode":"record_value",
         "pk.fields":"id",
         "incrementing.column.name":"id",
         "table.whitelist":"dz.temp_data",
         "mode":"incrementing",
         "key.converter.schemas.enable":"false",
         "value.converter.schemas.enable":"false",
         "key.converter":"io.confluent.connect.avro.AvroConverter",
         "value.converter":"io.confluent.connect.avro.AvroConverter",
         "key.converter.schema.registry.url":"http://localhost:8081",
         "value.converter.schema.registry.url":"http://localhost8081",
         "connection.url":"jdbc:sqlserver://**;databaseName=**;username=**;password=***",
         "name":"test-sink"},
         "tasks":[{"connector":"test-sink","task":0}],
         "type":"sink"}

4. ms sql 表:

CREATE TABLE dz.temp_data (
  id INTEGER IDENTITY(1,1) NOT NULL PRIMARY KEY,
  record1 VARCHAR(255) NOT NULL,
   record2 VARCHAR(255) NOT NULL,
record3 VARCHAR(255) NOT NULL,
record4 VARCHAR(255) NOT NULL,
created VARCHAR(255) NOT NULL
);

如果我通过 avro 消费者测试主题,我会得到正确的输出。

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

但我在插入 ms sql 数据库时遇到异常。

请帮助解决这个问题。提前致谢。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    如果您使用的是 JDBC Sink,则必须有一个模式。 CSV 文件没有架构。因此,您需要在此过程中定义一个。

    要了解有关序列化和架构的更多信息,请参阅here

    你有几个选择:

    1. 使用可以在摄取时应用架构的连接器(例如kafka-connect-spooldir)。如果您这样做,请确保在 Kafka Connect 源配置中使用带有 schemas.enable=true 的 AvroConverter 或 JSONConverter 作为 value.converter。您可以在here 看到一个这样的例子。

    2. 使用ksqlDB 应用您的架构,然后以更合适的格式(例如 Avro)将数据重新序列化到不同的主题,并使用该主题来填充数据库。例如:

      -- Declare a schema for the existing topic
      CREATE STREAM SOURCE_DATA (id INT, record1 VARCHAR, record2 VARCHAR, record3 VARCHAR, 
                                 record4 VARCHAR, record5 VARCHAR) 
                    WITH (KAFKA_TOPIC='connect-test', VALUE_FORMAT='DELIMITED');
      
      -- Write a new Kafka topic that serialises all the data 
      -- from the first topic to a new one, in Avro
      CREATE STREAM SOURCE_DATA_RESERIALISED WITH (KAFKA_TOPIC='connect-test_avro',
                                                   VALUE_FORMAT='AVRO') AS 
        SELECT * FROM SOURCE_DATA;
      

    【讨论】:

    • 我尝试了选项 2。文件源连接器不起作用。所以我向 kafka-console-producer 提供了 csv 文件。 kafka-console-producer --broker-list localhost:9092 --topic connect-test < /tmp/my-connect-test.dat
    • 我尝试使用管道分隔文件,但似乎在 confluent 5.3.1 中不可能,然后我将 confluent 更新为 5.4.1,有 vlue_delimiter 选项,所以我使用了Create stream sample {column1 string, column2 string } WITH (KAFKA_TOPIC='SampleData', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER='|');,但它给出了例外同时将 DELIMITED 流转换为 AVRO 格式。 CREATE STREAM sample_avro WITH (KAFKA_TOPIC='SampleDataAvro', VALUE_FORMAT='AVRO', VALUE_DELIMITER='|') AS SELECT * FROM sample_delimited`;对于此行获取异常“分隔符仅支持 DELIMITED 格式”
    • 开始一个新问题,提供完整的细节,我会在那里回答:)
    猜你喜欢
    • 2019-02-21
    • 1970-01-01
    • 1970-01-01
    • 2019-05-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-21
    相关资源
    最近更新 更多