【问题标题】:How to create batch or slide windows using Flink CEP?如何使用 Flink CEP 创建批处理或滑动窗口?
【发布时间】:2018-10-05 10:33:12
【问题描述】:

我刚从Flink CEP 开始,我来自Esper CEP 引擎。您可能(或不)知道,在Esper 中使用它们的语法(EPL),您可以轻松创建batchslide 窗口,将这些窗口中的事件分组并允许您可以将此事件与函数(avg、max、min、...)一起使用。

例如,使用以下模式,您可以创建一个 5 秒的批处理窗口,并计算您在该指定窗口中收到的所有 Stock 事件的属性 price 的平均值。

select avg(price) from Stock#time_batch(5 sec)

问题是我想知道如何在Flink CEP 上实现它。我知道Flink CEP 中的目标或方法可能有所不同,因此实现它的方式可能不像Esper CEP 中那么简单。

我查看了有关时间窗口的docs,但我无法与Flink CEP 一起实现此窗口。所以,给定以下代码:

DataStream<Stock> stream = ...; // Consume events from Kafka

// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
            .where(
                    new SimpleCondition<Stock>() {
                        public boolean filter(Stock event) {
                            return event.getPrice() >= 0;
                        }
                    }
            );

PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);

/**
  CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
  I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
  GREATER THAN A THREESHOLD, AN ALERT IS DETECTED

  return avg(allEventsInWindow.getPrice()) > 1;
*/  

DataStream<Alert> result = patternStream.select(
            new PatternSelectFunction<Stock, Alert>() {
                @Override
                public Alert select(Map<String, List<Stock>> pattern) throws Exception {
                    return new Alert(pattern.toString());
                }
            }
    );

如何创建一个窗口,在该窗口中,我从收到的第一个事件开始计算 5 秒内以下事件的平均值。例如:

t = 0 seconds 
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds     (...end of batch window...)
Avg = 1.5 => Alert detected!

5 秒后的平均值为 1.5,并将触发警报。我该如何编码?

谢谢!

【问题讨论】:

    标签: apache-flink complex-event-processing esper flink-cep


    【解决方案1】:

    使用 Flink 的 CEP 库,这种行为是不可表达的。我宁愿推荐使用 Flink 的DataStream 或 Table API 来计算平均值。基于此,您可以再次使用 CEP 生成其他事件。

    final DataStream<Stock> input = env
        .fromElements(
                new Stock(1L, 1.0),
                new Stock(2L, 2.0),
                new Stock(3L, 1.0),
                new Stock(4L, 2.0))
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
            @Override
            public long extractTimestamp(Stock element) {
                return element.getTimestamp();
            }
        });
    
    final DataStream<Double> windowAggregation = input
        .timeWindowAll(Time.milliseconds(2))
        .aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
            @Override
            public Tuple2<Integer, Double> createAccumulator() {
                return Tuple2.of(0, 0.0);
            }
    
            @Override
            public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
                return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
            }
    
            @Override
            public Double getResult(Tuple2<Integer, Double> accumulator) {
                return accumulator.f1 / accumulator.f0;
            }
    
            @Override
            public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
            }
        });
    
    final DataStream<Double> result = windowAggregation.filter((FilterFunction<Double>) value -> value > THRESHOLD);
    

    【讨论】:

    • 您好,感谢您的回答!很遗憾,为了一个简单的窗口操作,你在 Flink 中写的代码比 Esper 还要多。
    • 目前还不理想。但社区正在努力将 CEP 与 Flink 的流式 SQL 集成。这样,编写这类计算应该会容易得多。
    猜你喜欢
    • 2019-02-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-01
    • 1970-01-01
    • 2020-11-21
    • 1970-01-01
    相关资源
    最近更新 更多