【发布时间】:2017-08-10 02:39:27
【问题描述】:
我最近发现,即使在 local[1] 模式下运行 spark 或使用具有 1 个执行器和 1 个核心的 Yarn 时,在 UDF 中添加并行计算(例如使用并行集合)也会显着提高性能。
例如在local[1] 模式下,Spark-Jobs 消耗尽可能多的 CPU(即,如果我有 8 个内核,则为 800%,使用top 测量)。
这看起来很奇怪,因为我认为 Spark(或 yarn)会限制每个 Spark 应用程序的 CPU 使用率?
所以我想知道为什么会这样,是否建议在 spark 中使用并行处理/多线程,还是应该坚持使用 spark 并行化模式?
这里是一个示例(在具有 1 个实例和 1 个核心的 yarn 客户端模式下测量的时间)
case class MyRow(id:Int,data:Seq[Double])
// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()
df.show() // trigger computation and caching
// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d
val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)
df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds
【问题讨论】:
-
当您在 Yarn 集群中运行时,您可以根据纱线配置限制核心的使用。
-
我认为 Spark 会限制 Spark 进程(即任务调度程序)创建的线程数,但它无法停止 scala 的并行集合以从分配给任务的线程创建新线程。
标签: scala apache-spark parallel-processing