【问题标题】:How to sort a stream by event time using Flink SQL如何使用 Flink SQL 按事件时间对流进行排序
【发布时间】:2019-03-03 15:35:45
【问题描述】:

我有一个乱序的DataStream<Event>,我想对其进行排序,以便事件按其事件时间时间戳排序。我已将我的用例简化为我的 Event 类只有一个字段——timestamp 字段:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);

    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);

    sortedEventStream.print();

    env.execute();
}

我收到此错误:

线程“main”中的异常 org.apache.flink.table.api.SqlParserException:SQL 解析失败。 在第 1 行第 8 列遇到“timestamp FROM”。

似乎我没有以正确的方式指定事件时间属性,但不清楚是什么问题。

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql


    【解决方案1】:

    问题原来是在我的 Event 类中使用 timestamp 作为字段名称。将其更改为 eventTime 足以让一切正常运行:

    public class Sort {
        public static final int OUT_OF_ORDERNESS = 1000;
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                    .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
    
            Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
            tableEnv.registerTable("events", events);
            Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
            DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
    
            sortedEventStream.print();
    
            env.execute();
        }
    
        public static class Event {
            public Long eventTime;
    
            Event() {
                this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
            }
        }
    
        private static class OutOfOrderEventSource implements SourceFunction<Event> {
            private volatile boolean running = true;
    
            @Override
            public void run(SourceContext<Event> ctx) throws Exception {
                while(running) {
                    ctx.collect(new Event());
                    Thread.sleep(1);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    
        private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
            public TimestampsAndWatermarks() {
                super(Time.milliseconds(OUT_OF_ORDERNESS));
            }
    
            @Override
            public long extractTimestamp(Event event) {
                return event.eventTime;
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-05-12
      • 2020-11-09
      • 2022-08-16
      • 1970-01-01
      • 2011-03-16
      • 1970-01-01
      • 2011-09-20
      相关资源
      最近更新 更多