【问题标题】:Transform Kafka Connect JDBC Source Connector output to custom format将 Kafka Connect JDBC Source Connector 输出转换为自定义格式
【发布时间】:2021-05-21 09:49:00
【问题描述】:

我已经实现了一个 Kafka Connect JDBC 源连接器,该连接器连接到 Oracle 数据库并将数据写入 Kafka 主题。目前,我已经设置了选项value.converter=org.apache.kafka.connect.json.JsonConverter 并设置了value.converter.schemas.enable=false。此选项可以将 JSON 数据写入 Kafka 主题(顺便说一句,它工作正常),但不包括在将数据发送到 Kafka 代理之前修改数据的选项。

我现在的问题是:有没有办法修改发送到 Kafka 主题的数据?就我而言,源连接器运行自定义查询并将其直接写入 Kafka 主题。无论如何,我想用一些自定义列和嵌套来扩展这个 JSON。有办法吗?

【问题讨论】:

  • 顺便说一句,我想有一种方法可以做到这一点 - 通过实现自定义序列化程序。但是,这是唯一的方法吗?

标签: java apache-kafka apache-kafka-connect


【解决方案1】:
  1. 请不要使用JsonConverterschemas.enable=false :-) 您在Oracle 中的数据具有如此美妙的架构,丢掉它太可惜了!说真的,使用 Avro、Protobuf 或 JSON Schema 之类的东西可以让您在 Kafka 主题中保持较小的消息大小,同时保留模式。

    有关此重要概念的更多详细信息,请参阅文章 like this one

  2. 单消息转换 (SMT) 可能是您在将数据传输到 Kafka 的过程中寻找的方法。例如,您可以insertfieldsflatten payloadslots more。如果没有现有的 SMT 来做您想做的事,您可以使用 Java API 编写自己的。

    如果你想做更复杂的工作,比如加入、聚合等,你也可以使用 Kafka Streams 或 ksqlDB 对数据进行流处理。

【讨论】:

  • 感谢您的回答。但是,如果 Oracle 响应是一个包含 4 列的表,并且我希望发送到 Kafka 的数据是一个带有其他内容的 JSON 对象,该怎么办?
  • 除了使用禁用选项之外,您还有其他提示吗?
【解决方案2】:

执行某些 SMT 的示例配置,例如重命名字段、删除字段或添加字段。

流程:

DB 表 -connector 推断字段模式 -> 输入连接字段(内部连接数据结构/connectRecord(s)) -> SMT1 -> SMT2 -> ... -> 最后一个 SMT -> JsonConverter -> 输出 json消息。

数据库表:

current_name1 | current_name2 | FieldToDrop
bla1            bla2            bla3

推断的输入连接字段:

  "current_name1" = "bla1" // this is a connect record
  "current_name2" = "bla2" // this is a connect record
  "FieldToDrop" =  "bla3" // this is a connect record

输出 json 值:

{
  "new_name1": "bla1",
  "new_name2": "bla2",
  "type": "MyCustomType"
}

连接器配置:

name=example-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter 
... 

transforms=RenameFields,InsertFieldType,DropFields
    
    
transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.RenameFields.renames=current_name1:new_name1,current_name2:new_name2
    
    transforms.InsertFieldType.type=org.apache.kafka.connect.transforms.InsertField$Value
    transforms.InsertFieldType.static.field=type
    transforms.InsertFieldType.static.value=MyCustomType
    
    transforms.DropFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value 
    transforms.DropFields.blacklist=FieldToDrop
    

【讨论】:

    猜你喜欢
    • 2021-06-27
    • 2021-03-25
    • 2021-07-15
    • 2022-12-04
    • 2020-02-07
    • 2018-03-24
    • 2020-12-08
    • 2019-11-08
    • 2020-12-29
    相关资源
    最近更新 更多