CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以

供其他服务进行订阅及消费。

CDC主要分为基于查询和基于Binlog两种方式,这两种之间的区别:

 

基于查询的CDC

基于Binlog的CDC

开源产品

Sqoop、Kafka JDBC Source

Canal、Maxwell、Debezium

执行模式

Batch

Streaming

是否可以捕获所有数据变化

延迟性

高延迟

低延迟

是否增加数据库压力

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:

https://github.com/ververica/flink-cdc-connectors

https://ververica.github.io/flink-cdc-connectors/master/

   Flink| CDC

Caused by: org.apache.flink.table.api.ValidationException: Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is 5.6.

 

Mysql的配置

修改my.cnf配置 

[kris@hadoop101 ~]$ sudo vim /etc/my.cnf
[mysqld]
max_allowed_packet=1024M
server_id=1
log-bin=master
binlog_format=row
binlog-do-db=gmall 
binlog-do-db=test  #添加test库

重启mysql 

sudo service mysql start
  Starting MySQL [确定]

  或(mysql版本)

  sudo service mysqld start

查看mysql的binlog文件

kris@hadoop101 ~]$ cd /var/lib/mysql/
[kris@hadoop101 mysql]$ ll
总用量 178740
-rw-rw---- 1 mysql mysql     1076 4月  18 2021 mysql-bin.000095
-rw-rw---- 1 mysql mysql      143 5月   9 2021 mysql-bin.000096
-rw-rw---- 1 mysql mysql      846 7月  26 2021 mysql-bin.000097
-rw-rw---- 1 mysql mysql      143 11月  3 08:26 mysql-bin.000098
-rw-rw---- 1 mysql mysql      143 2月   4 22:05 mysql-bin.000099
-rw-rw---- 1 mysql mysql      120 2月   6 11:21 mysql-bin.000100
-rw-rw---- 1 mysql mysql     1900 2月   6 11:21 mysql-bin.index
srwxrwxrwx 1 mysql mysql        0 2月   6 11:21 mysql.sock
drwx------ 2 mysql mysql     4096 5月  17 2019 online_edu
drwx------ 2 mysql mysql     4096 3月  15 2019 performance_schema
-rw-r--r-- 1 root  root       125 3月  15 2019 RPM_UPGRADE_HISTORY
-rw-r--r-- 1 mysql mysql      125 3月  15 2019 RPM_UPGRADE_MARKER-LAST
drwx------ 2 mysql mysql     4096 4月  19 2019 sparkmall
drwxr-xr-x 2 mysql mysql     4096 4月  26 2020 test

 

案例

Maven 依赖:

    <properties>
        <flink-version>1.13.0</flink-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
View Code

相关文章: