【问题标题】:JDBC Sink Connector: How to map fields from the Kafka's message to the database table's columnJDBC Sink 连接器:如何将 Kafka 消息中的字段映射到数据库表的列
【发布时间】:2020-02-08 14:45:17
【问题描述】:

我正在使用Confluent JDBC Sink Connector 来捕获从 Kafka 主题到数据库的所有更改。我的消息是没有任何附加架构的 JSON 格式。例如:

{ "key1": "value1", "key2": 100}

这是我的配置:

name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true

我遇到的问题是:由于遗留系统,我无法更改消息格式。所以我的消息是没有模式信息的 JSON 对象。库是否支持映射字段?比如数据库下A字段到B字段的映射。

谢谢

【问题讨论】:

    标签: mysql jdbc apache-kafka apache-kafka-connect


    【解决方案1】:

    必须为您的数据声明一个架构以使用 JDBC Sink。这意味着在实践中您需要:

    如果您在将数据生成到 Kafka 时没有该选项,则可以构建应用架构的流处理阶段。您可以使用 Kafka Streams 或 KSQL 之类的东西来做到这一点。其输出是一个 Kafka 主题,然后您将其用作 Kafka Connect 的源。在 KSQL 中执行此操作的一个示例是:

    -- Declare the schema of the source JSON topic
    CREATE STREAM send_1_src (KEY1 VARCHAR, 
                              KEY2 INT) 
      WITH (KAFKA_TOPIC='send_1', 
            VALUE_FORMAT='JSON');
    
    -- Run a continuous query populating the target topic `SEND_1_AVRO` 
    -- with the data from `send_1` reserialised into Avro
    CREATE STREAM SEND_1_AVRO 
      WITH (VALUE_FORMAT='AVRO') AS 
      SELECT * 
        FROM send_1_src;
    

    • 了解更多关于 KSQL 的信息see here
    • 您可以在Kafka Tutorials here 中找到一些很好的流处理模式示例,其中包含原始 Kafka 消费者、Kafka Streams 和 KSQL。

    【讨论】:

      【解决方案2】:

      还有另一种选择,即编写消费者拦截器并将架构附加到值,然后再由 JDBC 接收器连接器使用。

      我试过了,效果很好!

      【讨论】:

      • 请粘贴一些工作配置以便更好地理解。
      猜你喜欢
      • 2021-12-19
      • 2020-07-31
      • 2022-07-29
      • 2018-10-21
      • 1970-01-01
      • 2019-06-04
      • 2021-04-15
      • 2018-07-29
      • 2019-04-18
      相关资源
      最近更新 更多