【发布时间】:2018-01-04 22:26:27
【问题描述】:
假设我每个执行程序有 36 个核心,每个节点有一个执行程序,并且每个节点有 3 个节点,每个节点有 48 个可用核心。我注意到的基本要点是,当我将每个任务设置为使用 1 个核心(默认值)时,我的 CPU 利用率约为 70%,每个执行程序将同时执行 36 个任务(如我所料) .但是,当我将配置更改为每个任务有 6 个内核 (--conf spark.task.cpus=6) 时,每个执行程序一次减少到 6 个任务(如预期的那样),但我的 CPU 利用率也下降到 10% 以下(意外)。我会假设 Spark 会知道如何在 6 个内核上并行化工作负载。
重要的实现细节是我在 DataFrame 的列上运行 UDF 函数,并将结果作为新列附加到该数据帧上。此 UDF 函数使用一个 @transient 对象,该对象提供了我正在使用的机器学习算法。此 UDF 函数不是聚合或合并操作的一部分,它只是对列的map 操作,如下所示:
def myUdf = udf { ... }
val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)
我原以为 Spark 会执行 6 个 myUdf 来一次处理 6 条记录,每个核心一个,但事实并非如此。有没有办法解决这个问题(无需向 Spark 项目提交 PR),或者至少有人可以解释为什么会发生这种情况?
预料到这个问题,我正在尝试增加每个任务的核心数量,以减少每个执行程序所需的 RAM 量。在这种情况下,一次执行太多任务会成倍增加 RAM 使用量。
【问题讨论】:
标签: scala performance apache-spark concurrency spark-dataframe