【问题标题】:Kafka Connect: How to extract a fieldKafka Connect:如何提取字段
【发布时间】:2020-11-01 21:11:15
【问题描述】:

我正在使用 Debezium SQL Server 连接器将表流式传输到主题中。感谢 Debezium 的 ExtractNewRecordState SMT,我在我的主题中收到以下消息。

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int64",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"customer_code"
         },
         {
            "type":"string",
            "optional":false,
            "field":"topic_name"
         },
         {
            "type":"string",
            "optional":true,
            "field":"payload_key"
         },
         {
            "type":"boolean",
            "optional":false,
            "field":"is_ordered"
         },
         {
            "type":"string",
            "optional":true,
            "field":"headers"
         },
         {
            "type":"string",
            "optional":false,
            "field":"payload"
         },
         {
            "type":"int64",
            "optional":false,
            "name":"io.debezium.time.Timestamp",
            "version":1,
            "field":"created_on"
         }
      ],
      "optional":false,
      "name":"test_server.dbo.kafka_event.Value"
   },
   "payload":{
      "id":129,
      "customer_code":"DVTPRDFT411",
      "topic_name":"DVTPRDFT411",
      "payload_key":null,
      "is_ordered":false,
      "headers":"{\"kafka_timestamp\":1594566354199}",
      "payload":"MSG 18",
      "created_on":1594595154267
   }
}

添加value.converter.schemas.enable=false后,我可以去掉schema部分,只剩下payload部分,如下所示。

{
   "id":130,
   "customer_code":"DVTPRDFT411",
   "topic_name":"DVTPRDFT411",
   "payload_key":null,
   "is_ordered":false,
   "headers":"{\"kafka_timestamp\":1594566354199}",
   "payload":"MSG 19",
   "created_on":1594595154280
}

我想更进一步,只提取customer_code 字段。我尝试了ExtractField$Value SMT,但我不断收到异常IllegalArgumentException: Unknown field: customer_code

我的配置如下

transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=drop
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extract.field=customer_code

我尝试了许多其他 SMT,包括 ExtractField$KeyValueToKey,但我无法使其工作。如果你能告诉我我做错了什么,我将不胜感激。根据来自 Confluent 的 tutorial,它应该可以工作,但没有。

** 更新 **

我正在使用 connect-standalone worker.properties sqlserver.properties 运行 Kafka Connect。

worker.properties

offset.storage.file.filename=C:/development/kafka_2.12-2.5.0/data/kafka/connect/connect.offsets
plugin.path=C:/development/kafka_2.12-2.5.0/plugins
bootstrap.servers=127.0.0.1:9092
offset.flush.interval.ms=10000
rest.port=10082
rest.host.name=127.0.0.1
rest.advertised.port=10082
rest.advertised.host.name=127.0.0.1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

sqlserver.properties

name=sql-server-connector
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=127.0.0.1
database.port=1433
database.user=sa
database.password=dummypassword
database.dbname=STGCTR
database.history.kafka.bootstrap.servers=127.0.0.1:9092

database.server.name=wfo
table.whitelist=dbo.kafka_event
database.history.kafka.topic=db_schema_history
transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=drop
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field=customer_code

【问题讨论】:

  • 如果您已经尝试过ExtractField$Value,请更新您的问题,因为ExtractField$Key 是错误的解决方案并且未知字段异常是可以的

标签: sql apache-kafka apache-kafka-connect debezium


【解决方案1】:

schemapayload 字段听起来就像您使用的是通过启用了模式的 JsonConverter 序列化的数据。

您只需设置value.converter.schemas.enable=false 即可实现您的目标。

【讨论】:

  • 感谢您的建议。这给了我想要的确切结果:)。但是,我仍然不知道我的 SMT 出了什么问题。你有什么建议吗?
  • 我尝试了ExtractField$ValueExtractField$Key 但我得到了同样的错误Unknown field
  • 如何应用 SMT?到源或接收器连接器?
  • 我直接应用在源连接器上。我用.properties 文件更新了我的帖子,以备不时之需。谢谢你的协助! :)
  • @Mr 为了提取字段,你需要一个模式
猜你喜欢
  • 2019-05-29
  • 2019-04-10
  • 2017-02-06
  • 2020-11-27
  • 2018-01-26
  • 2019-05-01
  • 1970-01-01
  • 2019-03-15
  • 2020-11-20
相关资源
最近更新 更多