【问题标题】:Apache Flink CEP how to detect if event did not occur within x seconds?Apache Flink CEP如何检测事件是否在x秒内没有发生?
【发布时间】:2018-05-15 18:26:50
【问题描述】:

例如,A 应该在 10 秒内跟随 B。我知道如何跟踪此 DID 是否发生(.next、.within),但如果 B 从未在窗口内发生,我想发送警报。

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // checkpointing is required for exactly-once or at-least-once guarantees
//      env.enableCheckpointing(1000);

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("localhost")
            .setPort(5672)
            .setVirtualHost("/")
            .setUserName("guest")
            .setPassword("guest")
            .build();

        final DataStream<String> inputStream = env
            .addSource(new RMQSource<String>(
                connectionConfig,               // config for the RabbitMQ connection
                "cep",                          // name of the RabbitMQ queue to consume
                true,                           // use correlation ids; can be false if only at-least-once is required
                new SimpleStringSchema()))      // deserialization schema to turn messages into Java objects
            .setParallelism(1);                 // non-parallel source is only required for exactly-once

        inputStream.print();

        Pattern<String, ?> simplePattern =
                Pattern.<String>begin("start")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("A");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("B");
                        }
                    });

        PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
        OutputTag<String> timedout = new OutputTag<String>("timedout"){};
        SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
            timedout,
            new TimedOut<String>(),
            new FlatSelectNothing<String>()
        );
        timedOutNotificationsStream.getSideOutput(timedout).print();

        env.execute("mynotification");
    }

public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
    @Override
    public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
        out.collect((String) "LATE!");
    }
}

public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
    @Override
    public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}

实际行为:

publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)

publish "A"
(wait 10 seconds)
=> (no alert, but should be)

publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"

预期行为:

publish "A"
(wait 10 seconds)
=> "LATE!"

【问题讨论】:

    标签: java apache-flink flink-streaming flink-cep


    【解决方案1】:

    您可以通过超时模式来做到这一点。您可以指定模式,如 A followedBy B within 10 seconds 并检查超时的模式,这意味着只有 A。您可以查看文档中的超时模式here

    有关完整示例,您可以参考此training 或直接参考excercise 的解决方案。


    编辑: 现在(flink ticket 跟踪改进它的努力

    【讨论】:

    • “Timed Out Partial Patterns”是我需要的术语。谢谢,不胜感激!
    • 其实我觉得我可能做错了什么。请看我更新的帖子。在发布 B 事件之前,我无法收到“迟到”通知。这不是我想要的,因为 B 永远不会发生(例如包裹丢失),如果是这种情况,我仍然想提醒一下。
    • @kayla 我已经更新了我的答案,但下次不要编辑问题,而是要完全改变含义,创建一个新问题。否则一个好的答案可能最终会不准确。
    • 很抱歉给您带来不便。所以听起来我所要求的目前无法完成。应该不是太大的问题,因为如果你能想象一些运输系统,运输事件应该总是会出现,除非整个运输系统都关闭了!是吗?
    • 我很抱歉,但我又玩了一些,不知道如何才能做到这一点。似乎我只能处理“包裹迟到”,但不能处理“包裹迟到/丢失”。如果您可以想象 packageId A 和 B。如果我这样做 input.keyBy("packageId") 这些将被单独检查,所以我不能通过为 B 发送事件来执行“下一个”事件来触发“A 迟到”。我已经尝试运送 A ->(时间窗口到期)就是这样。不试运A -> (时间窗口到期) -> 成功运A,所以它永远不会得到最后一个事件,并且包裹丢失无法检测和发送通知?
    【解决方案2】:

    您可以尝试以下解决方案吗?

    package com.nirav.modi.cep;
    
    import com.nirav.modi.dto.Event;
    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternFlatSelectFunction;
    import org.apache.flink.cep.PatternFlatTimeoutFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.util.List;
    import java.util.Map;
    
    public class EventNotOccur {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<Event> source = env.addSource(new SourceFunction<Event>() {
                @Override
                public void run(SourceContext<Event> ctx) throws Exception {
    
                    for (int i = 0; i < 1; i++) {
                        ctx.collect(new Event("A"));
                        Thread.sleep(5000);
                        ctx.collect(new Event("B"));
                        Thread.sleep(5000);
                        ctx.collect(new Event("A"));
                        Thread.sleep(15000);
                        ctx.collect(new Event("B"));
                        Thread.sleep(5000);
                        ctx.collect(new Event("B"));
                    }
                }
    
                @Override
                public void cancel() {
    
                }
            });
    
            Pattern<Event, ?> simplePattern =
                    Pattern.<Event>begin("start")
                            .where(new SimpleCondition<Event>() {
                                @Override
                                public boolean filter(Event event) {
                                    return event.getName().equals("A");
                                }
                            })
                            .next("end")
                            .where(new SimpleCondition<Event>() {
                                @Override
                                public boolean filter(Event event) {
                                    return event.getName().equals("B");
                                }
                            });
    
            source.print();
    
            PatternStream<Event> timedOutPatternStream = CEP.pattern(source, simplePattern.within(Time.seconds(10)));
    
            OutputTag<Event> timedout = new OutputTag<Event>("timedout") {
    
            };
    
            timedOutPatternStream.flatSelect(new PatternFlatSelectFunction<Event, String>() {
                @Override
                public void flatSelect(Map<String, List<Event>> pattern, Collector<String> out) throws Exception {
                    out.collect("Pattern Match...............");
                }
            }).print();
    
            SingleOutputStreamOperator<Event> longRides = timedOutPatternStream
                    .flatSelect(
                            timedout,
                            new EventTimeOut(),
                            new FlatSelectNothing()
                    );
    
            longRides.getSideOutput(timedout).print();
    
    
            env.execute("Flink Streaming Java API Skeleton");
    
        }
    
        public static class EventTimeOut<Event> implements PatternFlatTimeoutFunction<Event, Event> {
            @Override
            public void timeout(Map<String, List<Event>> map, long l, Collector<Event> collector) throws Exception {
                Event rideStarted = map.get("start").get(0);
                System.out.println("Time out Partial Event : " + rideStarted);
                collector.collect(rideStarted);
            }
        }
    
        public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
            @Override
            public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
    
                System.out.println("Flat select nothing: " + pattern.get("start").get(0));
                collector.collect(pattern.get("start").get(0));
    
            }
        }
    }
    

    【讨论】:

    • 这似乎与问题中的示例代码没有太大区别。它需要下一个事件来触发超时,是吗?所以 A->(10 seconds) 行不通,只有 A->(10 seconds)->B.
    猜你喜欢
    • 1970-01-01
    • 2016-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-14
    • 1970-01-01
    • 2019-06-10
    相关资源
    最近更新 更多