自定义结果表

CustomSink接口


  1. public abstract class CustomSinkBase implements Serializable{
  2. protected Map<String,String> userParamsMap; // 您在sql with语句中定义的键值对,但所有的键均为小写
  3. protected Set<String> primaryKeys; // 您定义的主键字段名
  4. protected List<String> headerFields;// 标记为header的字段列表
  5. protected RowTypeInfo rowTypeInfo;// 字段类型和名称
  6. /**
  7. * 初始化方法
  8. * @param taskNumber 当前节点是第几个并发
  9. * @param numTasks sink节点的并发数
  10. * @throws IOException
  11. */
  12. public abstract void open(int taskNumber, int numTasks) throws IOException;
  13. /**
  14. * close方法,释放资源
  15. * @throws IOException
  16. */
  17. public abstract void close() throws IOException;
  18. /**
  19. * 处理插入单行数据, Row中按到ddl定义的顺序
  20. *
  21. * @param row
  22. * @throws IOException
  23. */
  24. public abstract void writeAddRecord(Row row) throws IOException;
  25. /**
  26. * 处理删除单行数据,delete产生的原因可以参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming.html
  27. * @param row
  28. * @throws IOException
  29. */
  30. public abstract void writeDeleteRecord(Row row) throws IOException;
  31. /**
  32. * 如果有攒批进行批量插入,需要在该方法中实现清空该节点操作
  33. * @throws IOException
  34. */
  35. public abstract void sync() throws IOException;
  36. /**
  37. * 返回用来标识sink的名字
  38. * @throws IOException
  39. */
  40. public abstract String getName();
  41. }
  1. 实现的自定义结果表需要继承CustomSinkBase。
  2. 自定义结果表需要有一个无参构造函数,结果表的初始化工作,可以在open里通过读取userParamsMap中的配置参数进行初始化。
  3. 自定义结果表的项目工程需要添加如下jar包依赖。

    ① blink-connector-custom-blink-2.0-SNAPSHOT.jar

    • 文件:mvn install:install-file -DgroupId=com.alibaba.blink -DartifactId=blink-connector-custom -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=blink-connector-custom-blink-2.0-SNAPSHOT.jar
    • maven依赖:
      
      
      1. <dependency>
      2. <groupId>com.alibaba.blink</groupId>
      3. <artifactId>blink-connector-custom</artifactId>
      4. <version>blink-2.0-SNAPSHOT</version>
      5. <scope>provided</scope>
      6. </dependency>

    ② flink-core-blink-2.0-SNAPSHOT.jar

    • 文件:mvn install:install-file -DgroupId=org.apache.flink -DartifactId=flink-core -Dversion=blink-2.0-SNAPSHOT -Dpackaging=jar -Dfile=flink-core-blink-2.0-SNAPSHOT.jar
    • maven依赖:
      
      
      1. <dependency>
      2. <groupId>org.apache.flink</groupId>
      3. <artifactId>flink-core</artifactId>
      4. <version>blink-2.0-SNAPSHOT</version>
      5. <scope>provided</scope>
      6. </dependency>

用法

代码如下。

  1. public class UDPrintSink extends CustomSinkBase {
  2. private static Logger LOG = LoggerFactory.getLogger(UDPrintSink.class);
  3. public void open(int taskNumber, int numTasks) throws IOException {
  4. LOG.info(String.format("Open Method Called: taskNumber %d numTasks %d", taskNumber, numTasks));
  5. String[] filedNames = rowTypeInfo.getFieldNames();
  6. TypeInformation[] typeInformations = rowTypeInfo.getFieldTypes();
  7. LOG.info(String.format("Open Method Called: filedNames %d typeInformations %d", filedNames.length, typeInformations.length));
  8. }
  9. public void close() throws IOException {
  10. LOG.info(String.format("Close Method Called"));
  11. }
  12. public void writeAddRecord(Row row) throws IOException {
  13. LOG.info("Write: " + row.toString());
  14. }
  15. public void writeDeleteRecord(Row row) throws IOException {
  16. LOG.info("Delete: " + row.toString());
  17. }
  18. public void sync() throws IOException {
  19. //没有做攒批写入,空置该方法
  20. }
  21. public String getName() {
  22. return "UDPrintSink";
  23. }
  24. }
将代码打成一个jar,上传到系统中并引用。

自定义结果表

使用自定义结果表的DDL。

  1. create table customPrint (
  2. a int,
  3. b BIGINT,
  4. c VARCHAR
  5. ) with (
  6. type = 'custom',
  7. class = 'com.alibaba.blink.connector.custom.demo.UDPrintSink'
  8. [,...]
  9. );

说明:with参数

参数名 意义
type 填写custom声明这是一个自定义结果表。
class 填写在jar中实现结果表的类名
自定义参数 自行设定,open 函数中可以通过 userParamsMap 获取。

示例

上下游存储DDL

  1. --源 DDL
  2. create table suorce_name (
  3. a int,
  4. b BIGINT
  5. ) with (
  6. type = 'XXXX'
  7. ......
  8. );
  9. --自定义sink ddl
  10. create table customPrint (
  11. a int,
  12. b BIGINT,
  13. PRIMARY KEY(a)
  14. ) with (
  15. type = 'custom',
  16. class = 'UDPrintSink' --类名
  17. );
写入结果表的数据SQL语句

  1. INSERT INTO customPrint
  2. SELECT
  3. *
  4. FROM suorce_name
测试数据
a b
1 1234
2 56789
结果数据
a b
1 1234
2 56789
本文转自实时计算——自定义结果表

相关文章: