【问题标题】:Aggregation of multiple values using scala/spark使用 scala/spark 聚合多个值
【发布时间】:2016-07-02 12:07:03
【问题描述】:

我是 spark 和 scala 的新手。我想总结 RDD 中存在的所有值。下面是例子。 RDD 是键值对,假设在做一些连接和转换后,RDD 的输出有 3 条记录,如下所示,其中 A 是键:

(A, List(1,1,1,1,1,1,1))
(A, List(1,1,1,1,1,1,1))
(A, List(1,1,1,1,1,1,1))

现在我想将每条记录的所有值与其他记录中的相应值相加,所以输出应该像

(A, List(3,3,3,3,3,3,3))

谁能帮我解决这个问题。有没有可能使用 scala 实现这一点的方法?

提前致谢

【问题讨论】:

  • 我尝试将它们全部分组,然后根据位置添加元素....但无法获得所需的结果

标签: scala apache-spark


【解决方案1】:

一种天真的方法是reduceByKey

rdd.reduceByKey(
  (xs, ys) => xs.zip(ys).map { case (x, y) => x + y }
)

但它的效率相当低,因为它会在每次合并时创建一个新的List

您可以通过使用例如带有可变缓冲区的aggregateByKey 来改进它:

rdd.aggregateByKey(Array.fill(7)(0)) // Mutable buffer 
  // For seqOp we'll mutate accumulator 
  (acc, xs) => {
    for {
      (x, i) <- xs.zipWithIndex
    } acc(i) += x
    acc
  },
  // For performance you could modify acc1 as above
  (acc1, acc2) => acc1.zip(acc2).map { case(x, y) => x + y }
).mapValues(_.toList)

也应该可以使用DataFrames,但默认情况下,最近的版本会单独安排聚合,因此如果不调整配置​​,可能不值得。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-02
    • 1970-01-01
    • 2019-02-13
    • 1970-01-01
    相关资源
    最近更新 更多