【问题标题】:How many executor processes run for each worker node in spark?spark中每个worker节点运行多少个executor进程?
【发布时间】:2016-10-10 19:23:10
【问题描述】:

Spark 中每个工作节点会启动多少个执行器?我能知道它背后的数学原理吗?

例如,我有 6 个工作节点和 1 个主节点,如果我通过 spark-submit 提交作业,将为作业启动最大数量的执行器?

【问题讨论】:

    标签: apache-spark apache-spark-standalone


    【解决方案1】:

    捎带@LiMuBei 的回答...

    首先,它是你告诉它的任何东西

    --num-executors 4
    

    如果使用动态分配,那么这就是它为您决定的方式

    根据本文档 (http://jerryshao.me/architecture/2015/08/22/spark-dynamic-allocation-investigation/),

    Spark 如何通过挂起和运行的任务计算它需要的最大执行器数量:

     private def maxNumExecutorsNeeded(): Int = {
        val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
        (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
     }
    

    如果当前执行者数量大于预期数量:

     // The target number exceeds the number we actually need, so stop adding new
     // executors and inform the cluster manager to cancel the extra pending requests
     val oldNumExecutorsTarget = numExecutorsTarget
     numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
     numExecutorsToAdd = 1
    
     // If the new target has not changed, avoid sending a message to the cluster manager
     if (numExecutorsTarget < oldNumExecutorsTarget) {
       client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
       logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
         s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
     }
     numExecutorsTarget - oldNumExecutorsTarget
    

    如果当前的执行器数量大于期望的数量,Spark 将通知集群管理器取消挂起的请求,因为它们是不需要的。对于那些已经分配的executors,稍后将通过超时机制将它们降低到合理的数量。

    如果当前执行者数量不能满足所需数量:

     val oldNumExecutorsTarget = numExecutorsTarget
    
     // There's no point in wasting time ramping up to the number of executors we already have, so
     // make sure our target is at least as much as our current allocation:
     numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
    
     // Boost our target with the number to add for this round:
     numExecutorsTarget += numExecutorsToAdd
    
     // Ensure that our target doesn't exceed what we need at the present moment:
     numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
    
     // Ensure that our target fits within configured bounds:
     numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
     val delta = numExecutorsTarget - oldNumExecutorsTarget
    
     // If our target has not changed, do not send a message
     // to the cluster manager and reset our exponential growth
     if (delta == 0) {
       numExecutorsToAdd = 1
       return 0
     }
     val addRequestAcknowledged = testing ||
       client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
     if (addRequestAcknowledged) {
       val executorsString = "executor" + { if (delta > 1) "s" else "" }
       logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
         s" (new desired total will be $numExecutorsTarget)")
       numExecutorsToAdd = if (delta == numExecutorsToAdd) {
         numExecutorsToAdd * 2
       } else {
         1
       }
       delta
     } else {
       logWarning(
         s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
       0
     }
    

    【讨论】:

      【解决方案2】:

      两个可能的答案:

      • 如果您在调用spark-submit 时指定执行者的数量,您应该得到您要求的数量--num-executors X
      • 如果您未指定,则默认情况下 Spark 应使用动态分配,这将在需要时启动更多执行程序。在这种情况下,您可以配置行为,例如最大执行者数量,见http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

      每个工作节点的执行器数量将取决于可用的资源。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-08-05
        • 1970-01-01
        • 1970-01-01
        • 2016-06-04
        • 2014-08-06
        • 2017-02-04
        • 2015-09-08
        相关资源
        最近更新 更多