【问题标题】:Dynamic Pivot using Storm使用 Storm 的动态枢轴
【发布时间】:2014-01-31 09:16:08
【问题描述】:

我在 BigData DB(在我的例子中是 Cassandra)中有列名称为 col1、col2、col3、val1、val2 的行

在 SQL 方法中,我也可以按 col1,col2 或 col2,col1 或任何其他可能的方式进行分组。这样我可以很容易地形成树状层次结构。

但是现在我们使用 Cassandra 来存储不支持分组依据的数据。所以我们想使用 Storm 进行分组和聚合。 我们写了一些示例代码进行聚合和分组,但我们无法形成是否可以实现的意见。

数据看起来像这样

col1,col2,col3,val1,val2
------------------------
a1,b1,c1,10,20
a1,b1,c2,11,13
a1,b2,c1,9,15
a1,b2,c3,13,88
a2,b1,c1,30,44
a2,b3,c2,22,33
a4,b4,c4,99,66

就像在 excel pivot 中一样,我想构建层次结构 root->child1->child2->child3-val1,val2 如果我的层次结构是 col1->col2->col3,它可能看起来像这样

a1          {43,136}
    --b1        {21,33}
        --c1    10,20
        --c2    11,13
    --b2        {22,103}
        --c1    9,15
        --c3    13,88
a2          {52,77}
    --b1        {30,44}
        --c1    30,44
    --b3        {22,33}
    --c2    22,33
a4          {99,66}
    --b4        {99,66}
        --c4    99,66

我想为用户提供重新排列层次结构元素的功能,例如 col3->col1->col2(或其他东西,它是动态的) 在这种情况下,数据将如下所示

c1          {49,79}
    --a1        {19,35}
        --b1    10,20
        --b2    9,15
    --a2        {30,44}
        --b1    30,44
c2          {11,13}
    --a1        {11,13}
        --b1    11,13
    --a2        {22,33}
        --b3    22,33
c3          {13,88}
    --a1        {13,88}
        --b2    13,88
c4          {99,66}
    --a4        {99,66}
        --b4    99,66

我的三叉戟代码的几行看起来像这样,它没有按预期工作。

topology.newStream("aggregation", spout)
.groupBy(new Fields("col1","col2","col3","val1","val2"))
.aggregate(new Fields("val1","val2"), new Sum(), new Fields("val1sum","val2sum"))
.each(new Fields("col1","col2","col3","val1sum","val2sum"), new Utils.PrintFilter());

为了进行上述转换,我想在有或没有 Trident API 支持的情况下使用 Storm。 谁能指导我如何实现它?非常感谢任何程序创意。

【问题讨论】:

    标签: java cassandra pivot apache-storm trident


    【解决方案1】:

    您应该在 groupBy 中仅包含维度(您的 col1、col2 和 col3)而不是度量(您的 val1、val2)。 当您需要聚合多个度量时,您需要使用 chainedAgg() 构造。 以下是您的用例更改后的拓扑代码:

                topology.newStream("aggregation", spout)
        .groupBy(new Fields("col1","col2"))
        .chainedAgg()
        .aggregate(new Fields("val1"), new Sum(), new Fields("val1sum"))
        .aggregate(new Fields("val2"), new Sum(), new Fields("val2sum"))
        .chainEnd()
        .each(new Fields("col1","col2","val1sum", "val2sum"), new Utils.PrintFilter());
    

    正如您所料,它会产生以下输出!

    PartitionId=0, [a1, b1, 21, 33]

    PartitionId=0, [a1, b2, 22, 103]

    PartitionId=0, [a4, b4, 99, 66]

    PartitionId=0, [a2, b1, 30, 44]

    PartitionId=0, [a2, b3, 22, 33]

    干杯!

    MK

    【讨论】:

      猜你喜欢
      • 2017-09-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-06-26
      • 1970-01-01
      相关资源
      最近更新 更多