CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以
供其他服务进行订阅及消费。
CDC主要分为基于查询和基于Binlog两种方式,这两种之间的区别:
|
|
基于查询的CDC |
基于Binlog的CDC |
|
开源产品 |
Sqoop、Kafka JDBC Source |
Canal、Maxwell、Debezium |
|
执行模式 |
Batch |
Streaming |
|
是否可以捕获所有数据变化 |
否 |
是 |
|
延迟性 |
高延迟 |
低延迟 |
|
是否增加数据库压力 |
是 |
否 |
Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:
https://github.com/ververica/flink-cdc-connectors
https://ververica.github.io/flink-cdc-connectors/master/
Caused by: org.apache.flink.table.api.ValidationException: Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is 5.6.
Mysql的配置
修改my.cnf配置
[kris@hadoop101 ~]$ sudo vim /etc/my.cnf [mysqld] max_allowed_packet=1024M server_id=1 log-bin=master binlog_format=row binlog-do-db=gmall binlog-do-db=test #添加test库
重启mysql
sudo service mysql start Starting MySQL [确定]
或(mysql版本)
sudo service mysqld start
查看mysql的binlog文件
kris@hadoop101 ~]$ cd /var/lib/mysql/
[kris@hadoop101 mysql]$ ll
总用量 178740
-rw-rw---- 1 mysql mysql 1076 4月 18 2021 mysql-bin.000095
-rw-rw---- 1 mysql mysql 143 5月 9 2021 mysql-bin.000096
-rw-rw---- 1 mysql mysql 846 7月 26 2021 mysql-bin.000097
-rw-rw---- 1 mysql mysql 143 11月 3 08:26 mysql-bin.000098
-rw-rw---- 1 mysql mysql 143 2月 4 22:05 mysql-bin.000099
-rw-rw---- 1 mysql mysql 120 2月 6 11:21 mysql-bin.000100
-rw-rw---- 1 mysql mysql 1900 2月 6 11:21 mysql-bin.index
srwxrwxrwx 1 mysql mysql 0 2月 6 11:21 mysql.sock
drwx------ 2 mysql mysql 4096 5月 17 2019 online_edu
drwx------ 2 mysql mysql 4096 3月 15 2019 performance_schema
-rw-r--r-- 1 root root 125 3月 15 2019 RPM_UPGRADE_HISTORY
-rw-r--r-- 1 mysql mysql 125 3月 15 2019 RPM_UPGRADE_MARKER-LAST
drwx------ 2 mysql mysql 4096 4月 19 2019 sparkmall
drwxr-xr-x 2 mysql mysql 4096 4月 26 2020 test
案例
Maven 依赖:
<properties>
<flink-version>1.13.0</flink-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>