【发布时间】:2019-03-03 15:35:45
【问题描述】:
我有一个乱序的DataStream<Event>,我想对其进行排序,以便事件按其事件时间时间戳排序。我已将我的用例简化为我的 Event 类只有一个字段——timestamp 字段:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
我收到此错误:
线程“main”中的异常 org.apache.flink.table.api.SqlParserException:SQL 解析失败。 在第 1 行第 8 列遇到“timestamp FROM”。
似乎我没有以正确的方式指定事件时间属性,但不清楚是什么问题。
【问题讨论】:
标签: apache-flink flink-streaming flink-sql