一般来说,当需要按顺序处理乱序流时,最简单(也是高效)的处理方法是使用 Flink SQL,并依靠它来进行排序。请注意,它将依赖WatermarkStrategy 来确定何时可以安全地认为事件已准备好发出,并将丢弃任何迟到的事件。如果您必须了解迟到的事件,那么我建议您使用CEP 而不是带有 MATCH_RECOGNIZE 的 SQL(如下所示)。
有关使用水印进行排序的更多信息,请参阅the tutorial about Watermarks in the Flink docs。
以下是如何使用 Flink SQL 实现用例的示例:
public class SortAndDiff {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
.map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
@Override
public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
}
}).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
.withTimestampAssigner((event, timestamp) -> event.f1));
Table events = tableEnv.fromDataStream(input,
$("sensorId"),
$("ts").rowtime(),
$("kwh"));
Table results = tableEnv.sqlQuery(
"SELECT E.* " +
"FROM " + events + " " +
"MATCH_RECOGNIZE ( " +
"PARTITION BY sensorId " +
"ORDER BY ts " +
"MEASURES " +
"this_step.ts AS ts, " +
"next_step.kwh - this_step.kwh AS diff " +
"AFTER MATCH SKIP TO NEXT ROW " +
"PATTERN (this_step next_step) " +
"DEFINE " +
"this_step AS TRUE, " +
"next_step AS TRUE " +
") AS E"
);
tableEnv
.toAppendStream(results, Row.class)
.print();
env.execute();
}
}