如果您想创建实时数据管道,您需要使用能够从 MySQL 流式传输更改的更改数据捕获 (CDC) 工具。我建议Debezium 这是一个开源分布式平台,用于捕获变更数据。
捕获插入
当向表中添加新记录时,将生成类似于以下的 JSON:
{
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369632,
"gtid":null,
"file":"mysql-bin.000003",
"pos":364,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"c",
"ts_ms":1500369632095
}
}
before 对象为空,after 对象包含新插入的值。注意op 属性是c,表明这是一个 CREATE 事件。
捕获更新
假设email属性已经更新,会产生一个类似下面的JSON:
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369929,
"gtid":null,
"file":"mysql-bin.000003",
"pos":673,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"u",
"ts_ms":1500369929464
}
}
注意 op 现在是 u,表明这是一个 UPDATE 事件。 before 对象显示更新前的行状态,after 对象捕获更新行的当前状态。
捕获删除
现在假设该行已被删除;
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"after":null,
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500370394,
"gtid":null,
"file":"mysql-bin.000003",
"pos":1025,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"d",
"ts_ms":1500370394589
}
}
opnew 等于d,表示 DELETE 事件。 after 属性将为空,before 对象包含被删除之前的行。
您还可以查看他们网站上提供的extensive tutorial。
编辑: Example configuration 用于 MySQL 数据库
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "3306", (4)
"database.user": "debezium", (5)
"database.password": "dbz", (6)
"database.server.id": "184054", (7)
"database.server.name": "fullfillment", (8)
"database.whitelist": "inventory", (9)
"database.history.kafka.bootstrap.servers": "kafka:9092", (10)
"database.history.kafka.topic": "dbhistory.fullfillment" (11)
"include.schema.changes": "true" (12)
}
}
1 当我们向 Kafka Connect 注册连接器时的名称
服务。
2 此 MySQL 连接器类的名称。
3 地址
MySQL 服务器。
4 MySQL 服务器的端口号。
5 名称
具有所需权限的 MySQL 用户。
6 密码
具有所需权限的 MySQL 用户。
7 连接器的
在 MySQL 集群中必须是唯一的标识符,并且类似于
MySQL 的 server-id 配置属性。
8 逻辑名称
MySQL server/cluster,形成一个命名空间,用于所有的
连接器写入的 Kafka 主题的名称,Kafka
连接模式名称和相应 Avro 的命名空间
使用 Avro 连接器时的模式。
9 所有数据库的列表
由此连接器将监视的此服务器托管。这是
可选的,还有其他属性用于列出数据库和
要包括或排除在监视之外的表。
10 卡夫卡列表
此连接器将用于写入和恢复 DDL 的代理
数据库历史主题的语句。
11 数据库名称
连接器将写入和恢复 DDL 的历史主题
陈述。本主题仅供内部使用,请勿使用
由消费者。
12 指定连接器应
在名为 fullfillment events 的架构更改主题上生成
可供消费者使用的 DDL 更改。