【问题标题】:Flink SQL : Use changelog stream to update rows in Dynamic TableFlink SQL:使用 changelog 流更新动态表中的行
【发布时间】:2020-04-09 03:07:27
【问题描述】:

我有一个包含如下 JSON 消息的流:

{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}

此流在注册为TableSourceDataStream<Row> 中处理。

我想将此流用作 changelog 流 来更新 Flink Table 的内容,但我找不到这样做的方法。

我已将StreamTableSource 定义为:

public class MyTableSource implements StreamTableSource<Row>, ... {

    @Override
    public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
        DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream 
                .keyBy([SOME KEY])                  // Aggregate by key 
                .map(new MyMapFunction());          // Map the update message with the correct encoding ?

        return stream;
    }

    ... 
}

而这个TableSource用在

public void process(final StreamExecutionEnvironment env) {
    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    tableEnv.registerTableSource("MyTableSource", new MyTableSource());

    Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.

    result.insertInto([SOME SINK]);
}

这样做的好方法是什么? (更具体地说,如何使用流从表中删除行?)

【问题讨论】:

    标签: java apache-flink flink-sql


    【解决方案1】:

    目前,内部变更日志处理能力未通过 API 公开。因此,没有可用的资源可让您将传入的变更日志解释为表格。这是为Flink 1.11 计划的。

    在此之前,您可以考虑使用用户定义的聚合函数来应用此处建议的更新:

    Apache Flink: How to enable "upsert mode" for dynamic tables?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-06
      • 1970-01-01
      • 2014-10-20
      • 2021-05-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多