【问题标题】:Spark performance - how to parallelize large loops?Spark 性能 - 如何并行化大循环?
【发布时间】:2015-12-08 07:11:27
【问题描述】:

我有一个包含 8000 个循环的 Spark 应用程序,它在 5 个节点的集群上运行。每个节点有 125GB 内存和 32 个内核。相关代码如下所示:

for (m <- 0 until deviceArray.size) { // there are 1000 device 
  var id = deviceArray(m)

  for (t <- 1 to timePatterns) { // there are 8 time patterns
     var hrpvData = get24HoursPVF(dataDF, id, t).cache()

  var hrpvDataZI = hrpvData.zipWithIndex

  var clustersLSD = runKMeans(hrpvData, numClusters, numIterations)

  var clusterPVPred = hrpvData.map(x => clustersLSD.predict(x))
  var clusterPVMap = hrpvDataZI.zip(clusterPVPred)

  var pvhgmRDD = clusterPVMap.map{r => (r._2, r._1._2)}.groupByKey

  var arrHGinfo = pvhgmRDD.collect 

  // Post process data 
  // .....

  hrpvData.unpersist()
  }
}

函数调用get24HoursPVF()为k-means准备特征向量,大约需要40秒。每个循环大约需要 50 秒才能完成使用集群。我的数据大小为 2 到 3 GB(从表中读取)。给定 8000 个循环,运行此 Spark 应用程序的总时间是不可接受的 (8000x50s)。

由于每个设备都是独立的,有没有办法并行化 8000 次迭代?或者如何利用集群来解决总运行时间长的问题? Scala Future 不会工作,因为它只是同时提交作业,但 Spark 不会同时运行这些作业。

【问题讨论】:

    标签: performance scala parallel-processing apache-spark


    【解决方案1】:

    除了 for 循环之外,您的代码中有两个 Spark 中最慢的 API 调用 - groupByKeycollect

    groupByKey 几乎不应该使用,而是查看reduceByKey,查看此Databricks blog 了解更多详细信息。

    collect 将该 RDD 中的所有数据传输到驱动节点上的一个数组中,除非这是少量数据,否则会对性能产生相当大的影响。

    在 for 循环中,我不是特别熟悉您要执行的操作,但在

    var hrpvData = get24HoursPVF(dataDF, id, t).cache()
    

    您正在为每个 id 和 t 值构建和缓存一个新数据帧。我不知道为什么你不能在一开始就构建一个包含每个 id 和 t 变体的数据帧,然后在整个数据帧上运行你的 zipWithIndex、map 等?

    【讨论】:

    • 感谢您的 cmets。我同意 groupByKey 应该被 reduceByKey 取代,但在这种情况下,与其他耗时的部分相比,groupByKey 上的时间可以忽略不计。 Collect() 在这里传输少量数据。我需要使用每个时间模式在每个设备上运行 k-means 聚类,这就是我认为我必须为每个 id 和 t(以及因此为特征向量)构建和缓存新数据帧的原因。
    • 你的评论让我再次思考为什么我不能建立一个单一的大数据框。其实我可以。至少通过这种方式,我能够将函数调用 get24HoursPVF() 移到 8000 个循环之外,尽管我仍然需要运行 k-means 聚类 8000 次。所以它会减少运行时间。再次感谢!
    • 准确地说,groupByKey 并不比任何需要改组类似数量数据的操作更昂贵。这意味着按照这个逻辑你应该避免joincoogrouppartitionBy或任何类似的方法。
    • 除了 join、cogroup 等的巨大差异之外,没有一个简单的不洗牌替代方案
    • 如果你需要 group 而不是 group 后跟 some-reduction,那么 groupBy key 也别无选择。 Don't use groupByKey if you mean reduceByKeyAvoid GroupByKey 更多。并且 groupByKey 没有 no-shuffle 替代方案 - 它只有 -ess-to-shuffle 替代方案。
    猜你喜欢
    • 1970-01-01
    • 2012-10-17
    • 2013-09-02
    • 1970-01-01
    • 1970-01-01
    • 2021-08-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多