【发布时间】:2020-07-31 21:50:59
【问题描述】:
我有一个 kafka 主题,它使用 Debezium mysql 源连接器从 mysql 数据库中获取数据,以下是其中一条消息的格式:
{
"Message": {
"schema": {
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "annek@noanswer.org"
},
"source": {
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
}
}
}
}
我想使用 jdbc sink 连接器将 ts_ms, before, after 和 id(从对象/行)列推送到另一个数据库,表架构为 (id,before(text),after(text),timestamp),对于 kafka 的新手无法弄清楚:
如何仅从消息中提取这些字段以推送并忽略其他字段?
如何将字段前后转换为字符串/序列化格式?
如何从对象中提取
id? (插入操作,before为null,delete,after为null)
对于上面的消息,sink目标表最后应该有如下数据:
id: 1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after: '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815
【问题讨论】:
标签: mysql apache-kafka transformation apache-kafka-connect debezium