【问题标题】:Global Windows in Flink using Custom Triggers vs Flink CEP Pattern APIFlink 中的全局窗口使用自定义触发器与 Flink CEP 模式 API
【发布时间】:2018-03-01 10:41:41
【问题描述】:

我是 Apache Flink Api 的新手,我正在尝试了解它提供的不同窗口。

我有一个事件流,例如:

    device_id,trigger_id,event_time,messageId
    1,START,1520433909396,1
    1,TRACKING,1520433914398,2
    1,TRACKING,1520433919398,3
    1,STOP,1520433924398,4
    1,START,1520433929398,5
    1,TRACKING,1520433934399,6
    1,TRACKING,1520433939399,7
    1,TRACKING,1520433944399,8
    1,STOP,1520433949399,9

其中 trigger_id 可以是一个指标,例如:start、tracking、stop

我想做的是基于 device_id 对所有传入事件进行分组,并根据 trigger_id 定义一个窗口。即从开始到停止对所有事件进行分组,然后进行一些计算,例如:平均值、最大值等。

这可以定义为 GlobalWindow 和基于 trigger_id 的自定义触发器,并使用自定义驱逐器来驱逐事件列表每次 停止触发到达。

另一种选择是使用 Flink CEP。我已经定义了以下模式

DataStream<String> input = env.readTextFile("events.csv");

    // create event stream
    DataStream<Event> events = input.map(new LineToEvent());
    DataStream<Event> waterMarkedStreams = events.assignTimestampsAndWatermarks(new EventAssigner());

    Pattern<Event, Event> tripPattern =
            Pattern.<Event>begin("start",  AfterMatchSkipStrategy.noSkip())
                    .where(START_CONDITION)
                    .followedBy("middle").where(MIDDLE_CONDITION).oneOrMore()
                    .followedBy("end").where(END_CONDITION);
    PatternStream<Event> patternStream = CEP.pattern(waterMarkedStreams, tripPattern);

    DataStream<String> result = patternStream.select(
            new PatternSelectFunction<Event, String>() {
                @Override
                public String select(Map<String, List<Event>> pattern) throws Exception {

                    StringBuilder builder = new StringBuilder();
                    builder.append(pattern.get("start").get(0).getMessageId()).append(",");
                    List<Event> vals = pattern.get("middle");
                    for (Event e: vals) {
                        builder .append(e.getMessageId()).append(",");
                    }
                    builder.append(pattern.get("end").get(0).getMessageId()).append(",");
                    return builder.toString();
                }
            });

    result.print();

所有条件都是实现 SimpleCondition 的静态内部类

但是,该模式匹配事件流上所有可能的解决方案......

    1> 1,2,3,4,
    1> 1,2,3,6,9,
    2> 1,2,4,
    2> 5,6,7,8,9,
    3> 1,2,3,6,7,8,9,
    3> 5,6,7,9,
    4> 1,2,3,6,7,9,
    4> 5,6,9,

pattern 有 Evictor 的概念吗?你怎么能只保留特定的事件集。即

1,2,3,4,
5,6,7,8,9,

【问题讨论】:

  • 感谢您的提示,我们会​​调查它
  • 我看过这个例子。并创建了一个似乎有效的自定义触发器。我还查看了 Flink CEP,想知道我是否可以做类似的事情。我已经更新了我的问题

标签: apache-flink flink-cep


【解决方案1】:

是的,您应该可以使用 CEP 执行此操作。您需要做的不仅仅是定义模式,还必须将模式应用于流,然后选择匹配的序列并使用它们来发出一些结果。文档中有fairly complete example

【讨论】:

  • 我已经更新了我的问题。不仅仅包括我使用的模式。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多