【问题标题】:Why is each Spark Task not utilizing all allocated cores?为什么每个 Spark 任务没有使用所有分配的内核?
【发布时间】:2018-01-04 22:26:27
【问题描述】:

假设我每个执行程序有 36 个核心,每个节点有一个执行程序,并且每个节点有 3 个节点,每个节点有 48 个可用核心。我注意到的基本要点是,当我将每个任务设置为使用 1 个核心(默认值)时,我的 CPU 利用率约为 70%,每个执行程序将同时执行 36 个任务(如我所料) .但是,当我将配置更改为每个任务有 6 个内核 (--conf spark.task.cpus=6) 时,每个执行程序一次减少到 6 个任务(如预期的那样),但我的 CPU 利用率也下降到 10% 以下(意外)。我会假设 Spark 会知道如何在 6 个内核上并行化工作负载。

重要的实现细节是我在 DataFrame 的列上运行 UDF 函数,并将结果作为新列附加到该数据帧上。此 UDF 函数使用一个 @transient 对象,该对象提供了我正在使用的机器学习算法。此 UDF 函数不是聚合或合并操作的一部分,它只是对列的map 操作,如下所示:

def myUdf = udf { ... }

val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)

我原以为 Spark 会执行 6 个 myUdf 来一次处理 6 条记录,每个核心一个,但事实并非如此。有没有办法解决这个问题(无需向 Spark 项目提交 PR),或者至少有人可以解释为什么会发生这种情况?

预料到这个问题,我正在尝试增加每个任务的核心数量,以减少每个执行程序所需的 RAM 量。在这种情况下,一次执行太多任务会成倍增加 RAM 使用量。

【问题讨论】:

    标签: scala performance apache-spark concurrency spark-dataframe


    【解决方案1】:

    spark.task.cpus为每个任务分配的核心数。它用于为单个任务分配多个内核,以防用户代码是多线程的。如果您的 udf 不使用多个(不会在单个函数调用中生成多个线程)线程,那么核心就被浪费了。

    一次处理 6 条记录

    分配 6 个核心,spark.task.cpus 设置为 1。如果要限制节点上的任务数,请减少每个节点提供的核心数。

    本质上,Spark 可以通过将记录拆分到每个任务(根据分区)并确定每个执行器可以处理多少个并发任务来自行确定如何同时拆分多个记录上的 UDF 映射。但是,Spark 不能自动拆分每个任务每个核心的工作。为了在每个任务中使用多个内核,需要编写 UDF 中的代码,该代码将在每个任务一次(按顺序)一条记录上执行,以将该 UDF 中的计算并行化在一条记录上。

    【讨论】:

    • 另一种选择是转换为 RDD 并使用mapPartitions,然后将保存任务/分区项的集合转换为并行集合,并将您的转换映射到并行集合中的每个项。
    猜你喜欢
    • 1970-01-01
    • 2011-08-07
    • 1970-01-01
    • 1970-01-01
    • 2014-07-01
    • 1970-01-01
    • 2016-12-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多