【发布时间】:2020-04-09 03:07:27
【问题描述】:
我有一个包含如下 JSON 消息的流:
{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}
此流在注册为TableSource 的DataStream<Row> 中处理。
我想将此流用作 changelog 流 来更新 Flink Table 的内容,但我找不到这样做的方法。
我已将StreamTableSource 定义为:
public class MyTableSource implements StreamTableSource<Row>, ... {
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream
.keyBy([SOME KEY]) // Aggregate by key
.map(new MyMapFunction()); // Map the update message with the correct encoding ?
return stream;
}
...
}
而这个TableSource用在
public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("MyTableSource", new MyTableSource());
Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.
result.insertInto([SOME SINK]);
}
这样做的好方法是什么? (更具体地说,如何使用流从表中删除行?)
【问题讨论】:
标签: java apache-flink flink-sql