自定义结果表
CustomSink接口
public abstract class CustomSinkBase implements Serializable{protected Map<String,String> userParamsMap; // 您在sql with语句中定义的键值对,但所有的键均为小写protected Set<String> primaryKeys; // 您定义的主键字段名protected List<String> headerFields;// 标记为header的字段列表protected RowTypeInfo rowTypeInfo;// 字段类型和名称/*** 初始化方法* @param taskNumber 当前节点是第几个并发* @param numTasks sink节点的并发数* @throws IOException*/public abstract void open(int taskNumber, int numTasks) throws IOException;/*** close方法,释放资源* @throws IOException*/public abstract void close() throws IOException;/*** 处理插入单行数据, Row中按到ddl定义的顺序** @param row* @throws IOException*/public abstract void writeAddRecord(Row row) throws IOException;/*** 处理删除单行数据,delete产生的原因可以参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming.html* @param row* @throws IOException*/public abstract void writeDeleteRecord(Row row) throws IOException;/*** 如果有攒批进行批量插入,需要在该方法中实现清空该节点操作* @throws IOException*/public abstract void sync() throws IOException;/*** 返回用来标识sink的名字* @throws IOException*/public abstract String getName();}
- 实现的自定义结果表需要继承CustomSinkBase。
- 自定义结果表需要有一个无参构造函数,结果表的初始化工作,可以在open里通过读取userParamsMap中的配置参数进行初始化。
-
自定义结果表的项目工程需要添加如下jar包依赖。
① blink-connector-custom-blink-2.0-SNAPSHOT.jar
- 文件:
mvn install:install-file -DgroupId=com.alibaba.blink -DartifactId=blink-connector-custom -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=blink-connector-custom-blink-2.0-SNAPSHOT.jar - maven依赖:
<dependency><groupId>com.alibaba.blink</groupId><artifactId>blink-connector-custom</artifactId><version>blink-2.0-SNAPSHOT</version><scope>provided</scope></dependency>
② flink-core-blink-2.0-SNAPSHOT.jar
- 文件:
mvn install:install-file -DgroupId=org.apache.flink -DartifactId=flink-core -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=flink-core-blink-2.0-SNAPSHOT.jar - maven依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>blink-2.0-SNAPSHOT</version><scope>provided</scope></dependency>
- 文件:
用法
代码如下。
public class UDPrintSink extends CustomSinkBase {private static Logger LOG = LoggerFactory.getLogger(UDPrintSink.class);public void open(int taskNumber, int numTasks) throws IOException {LOG.info(String.format("Open Method Called: taskNumber %d numTasks %d", taskNumber, numTasks));String[] filedNames = rowTypeInfo.getFieldNames();TypeInformation[] typeInformations = rowTypeInfo.getFieldTypes();LOG.info(String.format("Open Method Called: filedNames %d typeInformations %d", filedNames.length, typeInformations.length));}public void close() throws IOException {LOG.info(String.format("Close Method Called"));}public void writeAddRecord(Row row) throws IOException {LOG.info("Write: " + row.toString());}public void writeDeleteRecord(Row row) throws IOException {LOG.info("Delete: " + row.toString());}public void sync() throws IOException {//没有做攒批写入,空置该方法}public String getName() {return "UDPrintSink";}}
将代码打成一个jar,上传到系统中并引用。
使用自定义结果表的DDL。
create table customPrint (a int,b BIGINT,c VARCHAR) with (type = 'custom',class = 'com.alibaba.blink.connector.custom.demo.UDPrintSink'[,...]);
说明:with参数
参数名 意义 type 填写 custom声明这是一个自定义结果表。class 填写在jar中实现结果表的类名 自定义参数 自行设定, open函数中可以通过userParamsMap获取。
示例
上下游存储DDL
--源 DDLcreate table suorce_name (a int,b BIGINT) with (type = 'XXXX'......);--自定义sink ddlcreate table customPrint (a int,b BIGINT,PRIMARY KEY(a)) with (type = 'custom',class = 'UDPrintSink' --类名);
写入结果表的数据SQL语句
INSERT INTO customPrintSELECT*FROM suorce_name
测试数据
| a | b |
|---|---|
| 1 | 1234 |
| 2 | 56789 |
结果数据
| a | b |
|---|---|
| 1 | 1234 |
| 2 | 56789 |
本文转自实时计算——自定义结果表