【问题标题】:Apache Flink CEP pattern detection with javaApache Flink CEP 模式检测与 java
【发布时间】:2019-04-09 08:28:00
【问题描述】:

我想做;
从 CEP 开始,使用 map 结构中包含的任何 arraylist 元素,然后继续使用我已经开始的其余 arraylist 元素。
地图和图案结构:

final Map< Integer,ArrayList<String>> deger = new HashMap<Integer,ArrayList<String>>();
        deger.put(1,new ArrayList<String>(Arrays.asList("h:1","l:1","g:0")));
        deger.put(2,new ArrayList<String>(Arrays.asList("h:1","l:1","g:1")));
        deger.put(3,new ArrayList<String>(Arrays.asList("h:2","l:3","g:1")));
        deger.put(4,new ArrayList<String>(Arrays.asList("h:0","l:2","g:2")));

 for(int i=1;i<deger.size()+1;i++) {
            temp1.add(deger.get(i));
        }

Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                new SimpleCondition<String>() {
//                    @Override
                    public boolean filter(String value) throws Exception {

                        for (ArrayList<String> aa: temp1){
                            for (String dd : aa)
                                if(value.equals(dd)){ 
                                    return true;
                                }
                        }
                        return false;
                    }
                }
        ).followedBy("middle").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(1));
                    }
                }
        ).followedBy("end").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(2));
                    }
                }
        );

我的目标是警告地图中的数组列表元素,但数组列表元素的顺序并不重要,因为其中有流。我想继续处理这个数组的剩余元素,我可以在其中返回信息当我从这里的任何数组开始时这个数组。例如:

Incoming data = "l:1","h:1","g:0"
my pattern = "h:1","l:1","g:0" 
Start -> l:1 find
Middle -> g:0 or h:1 | h:1 find
End -> g:0 find -> alarm

【问题讨论】:

  • 嘿,temp1 到底是什么?
  • CEP在某种程度上是基于时间序列的。如果您不在乎时间,为什么不直接使用循环来查找元素?
  • 我在模式函数的正上方添加了 temp1
  • 我实时工作,所以时间很重要
  • 好的,澄清一下。您想发出警报,如果列表中的任何模式以任何可能的顺序出现,但所有元素必须属于同一个列表?因此,如果 "h:1","l:1","g:0" 以任何顺序出现但如果 "h:1","l:1","g:2" 那么这里就不会有警报了吗?

标签: java apache-flink complex-event-processing


【解决方案1】:
 public static  Integer temp1;
    public static  Map<Integer,ArrayList<String>> temp2 = new HashMap<>();     
final Map< Integer,ArrayList<String>> deger = new HashMap<>();
            deger.put(1,new ArrayList<>(Arrays.asList("h:1","g:1","s:0")));
            deger.put(2,new ArrayList<>(Arrays.asList("h:1","g:1","g:0")));
            deger.put(3,new ArrayList<>(Arrays.asList("h:1","c:0","g:0")));
            deger.put(4,new ArrayList<>(Arrays.asList("h:1","s:1","g:0")));


            Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String value) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryStart : deger.entrySet()) {
                                if(entryStart.getValue().contains(value) && !temp2.containsKey(entryStart.getKey())){
                                        ArrayList<String> newList = new ArrayList<String>();
                                        newList.addAll(entryStart.getValue());
                                        newList.remove(value);
                                        temp2.put(entryStart.getKey(),newList);
                                        flag = true;
                                }
                            }
                            return flag;
                        }
                    }
            ).followedBy("middle").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String middle) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryMiddle : temp2.entrySet()) {
                                if(entryMiddle.getValue().contains(middle) && entryMiddle.getValue().size() == 2){
                                    ArrayList<String> newListMiddle = new ArrayList<String>();
                                    newListMiddle.addAll(entryMiddle.getValue());
                                    newListMiddle.remove(middle);
                                    temp2.put(entryMiddle.getKey(),newListMiddle);
                                    flag = true;
                                }
                            }
                            return flag;
                        }
                    }
            ).followedBy("end").where(
                    new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String end) throws Exception {
                            flag = false;
                            for(Map.Entry<Integer, ArrayList<String>> entryEnd : temp2.entrySet()) {
                                if(entryEnd.getValue().contains(end) && entryEnd.getValue().size() == 1){
                                    flag = true;
                                    temp1 = entryEnd.getKey();
                                }
                            }
                            if (flag)
                                temp2.remove(temp1);
                            return flag;
                        }
                    }
            );

            PatternStream<String> patternStream = CEP.pattern(stream_itemset_ham,pattern);

            DataStream<String> result = patternStream.select(
                    new PatternSelectFunction<String, String>() {
                        @Override
                        public String select(Map<String, List<String>> map) throws Exception {
                            ArrayList<String> NewList= new ArrayList<>();
                            NewList.addAll(deger.get(temp1));
                            String found = "Found";
                            for (String list_element : NewList)
                                found += " " + list_element ;
                            return found;
                        }
                    }
            );
            result.print();

我从您的问题中了解到可以提供这种解决方案。

【讨论】:

  • 这个方案基本上会匹配所有模式的元素的每一个组合,不管它是否真的是一个模式。我的意思是,如果你有以下输入流"h:1", "d:1", "g:1" , "b:9" , "s:0", "z:0", "g:0",它仍然会找到两个模式:h:1 g:1 g:0h:1 g:1 s:0,但输入中没有模式。
  • 嗨多米尼克,实际上,你也是对的。在我的代码中找到模式很重要,但找到元素并不重要。如果您有不同的解决方案,可以分享给我吗?
  • 嘿,我已经添加了答案:)
  • IterativeCondition 不知道是这样的事情。我学过。谢谢多米尼克。
【解决方案2】:

所以目前 AFAIK Flink 不支持开箱即用的无序模式,所以基本上我看到了两种解决这个问题的方法:

1) 您可以创建要搜索的所有可能模式,并简单地合并所有结果数据流。

2) 正如这篇文章所建议的FlinkCEP: Can I reference an earlier event to define a subsequent match?,您可以尝试使用IterativeCondition,这将允许您访问之前已经匹配的元素,所以基本上您必须定义匹配列表中所有可能元素的模式和然后只需检查最后一个条件是否所有三个都属于同一个列表。如果是,则找到该模式。

【讨论】:

    猜你喜欢
    • 2022-08-19
    • 2016-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-10
    • 2019-02-18
    • 1970-01-01
    相关资源
    最近更新 更多