【发布时间】:2018-10-05 10:33:12
【问题描述】:
我刚从Flink CEP 开始,我来自Esper CEP 引擎。您可能(或不)知道,在Esper 中使用它们的语法(EPL),您可以轻松创建batch 或slide 窗口,将这些窗口中的事件分组并允许您可以将此事件与函数(avg、max、min、...)一起使用。
例如,使用以下模式,您可以创建一个 5 秒的批处理窗口,并计算您在该指定窗口中收到的所有 Stock 事件的属性 price 的平均值。
select avg(price) from Stock#time_batch(5 sec)
问题是我想知道如何在Flink CEP 上实现它。我知道Flink CEP 中的目标或方法可能有所不同,因此实现它的方式可能不像Esper CEP 中那么简单。
我查看了有关时间窗口的docs,但我无法与Flink CEP 一起实现此窗口。所以,给定以下代码:
DataStream<Stock> stream = ...; // Consume events from Kafka
// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
.where(
new SimpleCondition<Stock>() {
public boolean filter(Stock event) {
return event.getPrice() >= 0;
}
}
);
PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);
/**
CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
GREATER THAN A THREESHOLD, AN ALERT IS DETECTED
return avg(allEventsInWindow.getPrice()) > 1;
*/
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Stock, Alert>() {
@Override
public Alert select(Map<String, List<Stock>> pattern) throws Exception {
return new Alert(pattern.toString());
}
}
);
如何创建一个窗口,在该窗口中,我从收到的第一个事件开始计算 5 秒内以下事件的平均值。例如:
t = 0 seconds
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds (...end of batch window...)
Avg = 1.5 => Alert detected!
5 秒后的平均值为 1.5,并将触发警报。我该如何编码?
谢谢!
【问题讨论】:
标签: apache-flink complex-event-processing esper flink-cep