以 standalone - cluster 模式为例

这是提交语句:

./spark-submit --master spark://node1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi …/examples/jars/spark-examples_2.11-2.4.0.jar 100

其中 spark - submit 脚本语句如下:

Spark2.0.2源码分析——SparkSubmit提交任务

执行 SparkSubmit 中的 main 方法 :

def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

上面代码中,spark-submit脚本提交的命令行参数,通过main函数的args获取,并将args参数传入SparkSubmitArguments中解析完成。最后通过匹配appArgs参数中的action类型,执行submit、kill、requestStatus操作。

进入到SparkSubmitArguments中,分析一下参数的解析过程。SparkSubmitArguments中关键代码如下所示:

// Set parameters from command line arguments
  try {
// 调用parse方法,从命令行解析出各个参数
    parse(args.asJava)
  } catch {
  //捕获到IllegalArgumentException,打印错误并退出
    case e: IllegalArgumentException =>
      SparkSubmit.printErrorAndExit(e.getMessage())
  }
  // 合并默认的Spark配置项,使用传入的配置覆盖默认的配置
  mergeDefaultSparkProperties()
  // 从sparkProperties移除不是“spark.”为开始的配置
  ignoreNonSparkProperties()
  // 加载系统环境变量中的配置信息
  loadEnvironmentArguments()
  // 验证参数是否合法
  validateArguments()

这里调用父类的SparkSubmitOptionParser中的parse函数查找args中设置的--选项和值并解析为name和value,如--master yarn-client会被解析为值为--master的name和值为yarn-client的value。这之后调用SparkSubmitArguments#handle(MASTER, "yarn-client")进行处理。

override protected def handle(opt: String, value: String): Boolean = {
    opt match {
      case NAME =>
        name = value

      case MASTER =>
        master = value

      case CLASS =>
        mainClass = value

      case DEPLOY_MODE =>
        if (value != "client" && value != "cluster") {
          SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
        }
        deployMode = value

...

}

这个函数也很简单,根据参数opt及value,设置各个成员的值。接上例,parse中调用handle("--master", "yarn-client")后,在handle函数中,master成员将被赋值为yarn-client

值得一提的是,如果没有--class参数,尝试从JAR中设置main类:

// Try to set main class from JAR if no --class argument is given
    if (mainClass == null && !isPython && !isR && primaryResource != null) {
      val uri = new URI(primaryResource)
      val uriScheme = uri.getScheme()

      uriScheme match {
        case "file" =>
          try {
            val jar = new JarFile(uri.getPath)
            // Note that this might still return null if no main-class is set; we catch that later
            mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
          } catch {
            case e: Exception =>
              SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
          }
        case _ =>
          SparkSubmit.printErrorAndExit(
            s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
            "Please specify a class through --class.")
      }
    }

注意,case MASTER中的MASTER的值在SparkSubmitOptionParser定义为--master,MASTER与其他值定义如下:

protected final String CLASS = "--class";
  protected final String CONF = "--conf";
  protected final String DEPLOY_MODE = "--deploy-mode";
  protected final String DRIVER_CLASS_PATH = "--driver-class-path";
  protected final String DRIVER_CORES = "--driver-cores";
  protected final String DRIVER_JAVA_OPTIONS =  "--driver-java-options";
  protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
  protected final String DRIVER_MEMORY = "--driver-memory";
  protected final String EXECUTOR_MEMORY = "--executor-memory";
  protected final String FILES = "--files";
  protected final String JARS = "--jars";
  protected final String KILL_SUBMISSION = "--kill";
  protected final String MASTER = "--master";
  protected final String NAME = "--name";
  protected final String PACKAGES = "--packages";
  protected final String PACKAGES_EXCLUDE = "--exclude-packages";
  protected final String PROPERTIES_FILE = "--properties-file";
  protected final String PROXY_USER = "--proxy-user";
  protected final String PY_FILES = "--py-files";
  protected final String REPOSITORIES = "--repositories";
  protected final String STATUS = "--status";
  protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";

  // Options that do not take arguments.
  protected final String HELP = "--help";
  protected final String SUPERVISE = "--supervise";
  protected final String USAGE_ERROR = "--usage-error";
  protected final String VERBOSE = "--verbose";
  protected final String VERSION = "--version";

  // Standalone-only options.

  // YARN-only options.
  protected final String ARCHIVES = "--archives";
  protected final String EXECUTOR_CORES = "--executor-cores";
  protected final String KEYTAB = "--keytab";
  protected final String NUM_EXECUTORS = "--num-executors";
  protected final String PRINCIPAL = "--principal";
  protected final String QUEUE = "--queue";

 

SparkSubmit#main函数中有case SparkSubmitAction.SUBMIT => submit(appArgs),这句代码判断是否是提交参数并执行程序,如果匹配到SparkSubmitAction.SUBMIT,则调用submit(appArgs)方法,参数appArgs是SparkSubmitArguments类型,appArgs中包含了提交的各种参数,包括命令行传入以及默认的配置项。


submit(appArgs):

private def submit(args: SparkSubmitArguments): Unit = {
    // 返回一个四元组
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            if (e.getStackTrace().length == 0) {
              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              exitFn(1)
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }
    // 不管是不是 StandaloneCluster 模式都会调用 doRunMain 方法
    if (args.isStandaloneCluster && args.useRest) {
      try {
        printStream.println("Running Spark using the REST application submission protocol.")
        doRunMain()
      } catch {
        case e: SubmitRestConnectionException =>
          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args)
      }
    } else {
      doRunMain()
    }
  }

这段代码中,调用 prepareSubmitEnvironment(args) 方法,完成提交环境的准备。该方法返回一个四元 Tuple ,分别表示子进程参数、子进程 classpath 列表、系统属性 map 、子进程 main 方法。完成了提交环境的准备工作之后,接下来就将启动子进程,在 Standalone 模式下,启动的子进程是 org.apache.spark.deploy.Client 对象(在 prepareSubmitEnvironment 中 childMainClass 已被匹配为 org.apache.spark.deploy.Client )。具体执行过程在 runMain 函数中,代码如下所示:

private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    // scalastyle:off println
    if (verbose) {
      printStream.println(s"Main class:\n$childMainClass")
      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      printStream.println("\n")
    }
    // scalastyle:on println

    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)
    // 遍历classpath列表
    for (jar <- childClasspath) {
    // 使用loader类加载器将jar包依赖加入classpath
      addJarToClasspath(jar, loader)
    }

    for ((key, value) <- sysProps) {
    // 将sysProps中的配置全部设置到System全局变量中
      System.setProperty(key, value)
    }

    var mainClass: Class[_] = null

    try {
    // 获取启动的MainClass
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        if (childMainClass.contains("thriftserver")) {
          // scalastyle:off println
          printStream.println(s"Failed to load main class $childMainClass.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        e.printStackTrace(printStream)
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          // scalastyle:off println
          printStream.println(s"Failed to load hive class.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    // SPARK-4170
    if (classOf[scala.App].isAssignableFrom(mainClass)) {
      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
    }
    // 得到启动的对象的main方法
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    @tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }

    try {
      // 使用反射执行main方法,并将childArgs作为参数传入该main方法
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }
  }

在上面代码中,使用Utils工具提供的classForName方法,找到主类,然后在mainClass上调用getMethod方法得到main方法,最后在mainMethod上调用invoke执行main方法。需要注意的是,执行invoke方法同时传入了childArgs参数,这个参数中保留了配置信息。Utils.classForName(childMainClass)方法将会返回要执行的主类。

注意:这里的 mainClass 和 SparkSubmitArguments 中的 mainClass 是不同的变量。

程序将启动org.apache.spark.deploy.Client这个类,并运行主方法,Client类中做了哪些事情?先来看看这个类中怎样完成调用吧。下面是Client对象及主方法:

object Client {
  def main(args: Array[String]) {
    // scalastyle:off println
    if (!sys.props.contains("SPARK_SUBMIT")) {
      println("WARNING: This client is deprecated and will be removed in a future version of Spark")
      println("Use ./bin/spark-submit with \"--master spark://host:port\"")
    }
    // scalastyle:on println
    // 创建SparkConf对象
    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)
    // 设置RPC请求超时时间为10秒
    conf.set("spark.rpc.askTimeout", "10")
    Logger.getRootLogger.setLevel(driverArgs.logLevel
    // 使用RpcEnv的create创建Rpc环境
    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    //获得和Master通信的RPCEndpointRef
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    //注册ClientEndpoint
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    // 等待rpcEnv的终止
    rpcEnv.awaitTermination()
  }
}

SparkSubmit提交任务结束。

 

参考:https://blog.csdn.net/duan_zhihua/article/details/73456538

https://blog.csdn.net/vfgbv/article/details/52034016

相关文章: