【问题标题】:Siddhi CEP: Aggregate the String values of grouped events in a batch time windowSiddhi CEP:在批处理时间窗口中聚合分组事件的字符串值
【发布时间】:2018-04-18 20:21:32
【问题描述】:

我正在使用 Siddhi 来减少系统中存在的事件数量。为此,我声明了一个批处理时间窗口,它根据 target_ip 对所有事件进行分组。

from Events#window.timeBatch(30 sec)
select id as meta_ID, Target_IP4 as target_ip
group by Target_IP4
insert into temp;

我想要的结果是每个 target_ip 的单个事件和 meta_ID 参数值作为形成事件的不同事件的串联。

问题在于前面的查询生成的事件与不同的 meta_ID 值一样多。例如,我得到了

  1. “id_10”、“target_1”
  2. “id_11”、“target_1”

我想拥有

  1. "id_10,id_11", "target_1"

我知道我的查询中缺少一些聚合方法,我在 Siddhi 中看到了很多聚合函数,包括具有 str:concat 方法的 siddhi-execution-string 扩展,但我不知道如何使用它来聚合 meta_ID 值。有什么想法吗?

【问题讨论】:

    标签: aggregate wso2cep wso2-das siddhi


    【解决方案1】:

    您可以编写如下所示的执行计划,以实现您的要求:

    define stream inputStream (id string, target string);
    
    -- Query 1
    from inputStream#window.timeBatch(30 sec)
    select *
    insert into temp;
    
    -- Query 2
    from temp#custom:aggregator(id, target) 
    select *
    insert into reducedStream;
    

    在这里,custom:aggregator 是您必须实现的自定义流处理器扩展。实现时可以按照[1]进行。

    让我解释一下事情是如何运作的:

    查询 1 每 30 秒生成一批事件。换句话说,我们使用 Query 1 来创建一批事件。

    因此,在每 30 秒间隔结束时,这批事件将被馈送到 custom:aggregator 流处理器中。当流处理器接收到输入时,它的 process() 方法将被触发。

    @Override
        protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
            //implement the aggregation & grouping logic here
    }
    

    这批事件在 streamEventChunk 中。在实现 process() 方法时,您可以遍历 streamEventChunk 并为每个目的地创建一个事件。您需要在 process() 方法中实现此逻辑。

    [1]https://docs.wso2.com/display/CEP420/Writing+a+Custom+Stream+Processor+Extension

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-05
      • 1970-01-01
      • 1970-01-01
      • 2012-06-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多