【问题标题】:How to find Sum at Each partition in Spark如何在 Spark 的每个分区中求和
【发布时间】:2019-11-01 19:11:27
【问题描述】:

我已经创建了类并使用该类来创建 RDD。我想计算每个分区的 LoudnessRate (类成员)的总和。这个总和稍后将用于计算每个分区的平均 LoudnessRate。 我尝试了以下代码,但它不计算总和并返回 0.0。 我的代码是

    object sparkBAT {
      def main(args: Array[String]): Unit = {
        val numPartitions = 3
        val N = 50
        val d = 5
        val MinVal = -10
        val MaxVal =  10
        val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
        val sc = new SparkContext(conf)

        val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
        val rdd = sc.parallelize(ba, numPartitions)

        var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
        rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
        arrSum foreach println
      }
    }


    class BAT (dim:Int, min:Double, max:Double) extends Serializable {    
      val random = new Random()
      var position      : List[Double]      =   List.fill(dim) (random.nextDouble() * (max-min)+min )
      var velocity      :List[Double]       =   List.fill(dim)( math.random)
      var PulseRate     : Double            =   0.1
      var LoudnessRate  :Double             =   0.95
      var frequency     :Double             =   math.random
      var fitness       :Double             =   math.random
      var BestPosition  :List[Double]       =   List.fill(dim)(math.random)
      var BestFitness   :Double             =   math.random 
    }

【问题讨论】:

  • 您正在修改执行程序 JVM 中的 arrSum 并在驱动程序 JVM 中打印其值。您可以将迭代器映射到单例迭代器并使用collect 将值移动到驱动程序。另外,不要使用iterator.map 来产生副作用,iterator.foreach 就是为此而生的
  • @ollik1 这应该是一个答案(如果您添加代码)。
  • @ollik1 你能加代码吗?
  • @AlexeyRomanov 和 yari,很公平,添加了代码作为答案

标签: scala apache-spark rdd partitioning


【解决方案1】:

根据要求将我的评论更改为答案。原评论

您正在 executor JVM 中修改 arrSum 并在 dirver JVM 中打印其值。您可以将迭代器映射到单例迭代器并使用 collect 将值移动到驱动程序。另外,不要将 iterator.map 用于副作用,iterator.foreach 就是为此而设计的。

这里是一个示例 sn-p 如何做到这一点。首先创建一个带有两个分区的 RDD,0 -> 1,2,31 -> 4,5。当然,您在实际代码中不需要这个,但由于 sc.parallelize 行为会根据环境而变化,这将始终创建统一的 RDD 来重现:

object DemoPartitioner extends Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key match {
    case num: Int => num
  }
}
val rdd = sc
  .parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
  .partitionBy(DemoPartitioner)
  .map(_._2)

然后是真正的技巧:

val sumsByPartition = rdd.mapPartitionsWithIndex {
  case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)

输出:

Map(0 -> 6, 1 -> 9)

【讨论】:

  • 这会根据我的需要给出所需的结果。还有一个问题,这是计算总和的最佳方法还是我们可以进行更多优化?关于速度
  • 我想说在大多数情况下,它几乎是计算每个分区总和的最快方法(这本身并不是使用 Spark 的常见操作)。执行者之间没有洗牌,这通常是缓慢的部分,只有极简数据通过网络发送到驱动程序。
【解决方案2】:

问题在于您使用的是在驱动程序中声明并在执行程序中更新的 arrSum(常规集合)。每当您这样做时,您都需要使用累加器。

This 应该有帮助

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-12
    • 2021-12-21
    • 1970-01-01
    • 2017-04-27
    • 1970-01-01
    • 2020-02-11
    相关资源
    最近更新 更多