以 standalone - cluster 模式为例
这是提交语句:
./spark-submit --master spark://node1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi …/examples/jars/spark-examples_2.11-2.4.0.jar 100
其中 spark - submit 脚本语句如下:
执行 SparkSubmit 中的 main 方法 :
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
上面代码中,spark-submit脚本提交的命令行参数,通过main函数的args获取,并将args参数传入SparkSubmitArguments中解析完成。最后通过匹配appArgs参数中的action类型,执行submit、kill、requestStatus操作。
进入到SparkSubmitArguments中,分析一下参数的解析过程。SparkSubmitArguments中关键代码如下所示:
// Set parameters from command line arguments
try {
// 调用parse方法,从命令行解析出各个参数
parse(args.asJava)
} catch {
//捕获到IllegalArgumentException,打印错误并退出
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
// 合并默认的Spark配置项,使用传入的配置覆盖默认的配置
mergeDefaultSparkProperties()
// 从sparkProperties移除不是“spark.”为开始的配置
ignoreNonSparkProperties()
// 加载系统环境变量中的配置信息
loadEnvironmentArguments()
// 验证参数是否合法
validateArguments()
这里调用父类的SparkSubmitOptionParser中的parse函数查找args中设置的--选项和值并解析为name和value,如--master yarn-client会被解析为值为--master的name和值为yarn-client的value。这之后调用SparkSubmitArguments#handle(MASTER, "yarn-client")进行处理。
override protected def handle(opt: String, value: String): Boolean = {
opt match {
case NAME =>
name = value
case MASTER =>
master = value
case CLASS =>
mainClass = value
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
...
}
这个函数也很简单,根据参数opt及value,设置各个成员的值。接上例,parse中调用handle("--master", "yarn-client")后,在handle函数中,master成员将被赋值为yarn-client。
值得一提的是,如果没有--class参数,尝试从JAR中设置main类:
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()
uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}
注意,case MASTER中的MASTER的值在SparkSubmitOptionParser定义为--master,MASTER与其他值定义如下:
protected final String CLASS = "--class";
protected final String CONF = "--conf";
protected final String DEPLOY_MODE = "--deploy-mode";
protected final String DRIVER_CLASS_PATH = "--driver-class-path";
protected final String DRIVER_CORES = "--driver-cores";
protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";
protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
protected final String DRIVER_MEMORY = "--driver-memory";
protected final String EXECUTOR_MEMORY = "--executor-memory";
protected final String FILES = "--files";
protected final String JARS = "--jars";
protected final String KILL_SUBMISSION = "--kill";
protected final String MASTER = "--master";
protected final String NAME = "--name";
protected final String PACKAGES = "--packages";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files";
protected final String REPOSITORIES = "--repositories";
protected final String STATUS = "--status";
protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
// Options that do not take arguments.
protected final String HELP = "--help";
protected final String SUPERVISE = "--supervise";
protected final String USAGE_ERROR = "--usage-error";
protected final String VERBOSE = "--verbose";
protected final String VERSION = "--version";
// Standalone-only options.
// YARN-only options.
protected final String ARCHIVES = "--archives";
protected final String EXECUTOR_CORES = "--executor-cores";
protected final String KEYTAB = "--keytab";
protected final String NUM_EXECUTORS = "--num-executors";
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";
SparkSubmit#main函数中有case SparkSubmitAction.SUBMIT => submit(appArgs),这句代码判断是否是提交参数并执行程序,如果匹配到SparkSubmitAction.SUBMIT,则调用submit(appArgs)方法,参数appArgs是SparkSubmitArguments类型,appArgs中包含了提交的各种参数,包括命令行传入以及默认的配置项。
submit(appArgs):
private def submit(args: SparkSubmitArguments): Unit = {
// 返回一个四元组
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
if (e.getStackTrace().length == 0) {
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// 不管是不是 StandaloneCluster 模式都会调用 doRunMain 方法
if (args.isStandaloneCluster && args.useRest) {
try {
printStream.println("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
} else {
doRunMain()
}
}
这段代码中,调用 prepareSubmitEnvironment(args) 方法,完成提交环境的准备。该方法返回一个四元 Tuple ,分别表示子进程参数、子进程 classpath 列表、系统属性 map 、子进程 main 方法。完成了提交环境的准备工作之后,接下来就将启动子进程,在 Standalone 模式下,启动的子进程是 org.apache.spark.deploy.Client 对象(在 prepareSubmitEnvironment 中 childMainClass 已被匹配为 org.apache.spark.deploy.Client )。具体执行过程在 runMain 函数中,代码如下所示:
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
// 遍历classpath列表
for (jar <- childClasspath) {
// 使用loader类加载器将jar包依赖加入classpath
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
// 将sysProps中的配置全部设置到System全局变量中
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
// 获取启动的MainClass
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
// 得到启动的对象的main方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
// 使用反射执行main方法,并将childArgs作为参数传入该main方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
在上面代码中,使用Utils工具提供的classForName方法,找到主类,然后在mainClass上调用getMethod方法得到main方法,最后在mainMethod上调用invoke执行main方法。需要注意的是,执行invoke方法同时传入了childArgs参数,这个参数中保留了配置信息。Utils.classForName(childMainClass)方法将会返回要执行的主类。
注意:这里的 mainClass 和 SparkSubmitArguments 中的 mainClass 是不同的变量。
程序将启动org.apache.spark.deploy.Client这个类,并运行主方法,Client类中做了哪些事情?先来看看这个类中怎样完成调用吧。下面是Client对象及主方法:
object Client {
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
// 创建SparkConf对象
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
// 设置RPC请求超时时间为10秒
conf.set("spark.rpc.askTimeout", "10")
Logger.getRootLogger.setLevel(driverArgs.logLevel
// 使用RpcEnv的create创建Rpc环境
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
//获得和Master通信的RPCEndpointRef
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//注册ClientEndpoint
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
// 等待rpcEnv的终止
rpcEnv.awaitTermination()
}
}
SparkSubmit提交任务结束。
参考:https://blog.csdn.net/duan_zhihua/article/details/73456538
https://blog.csdn.net/vfgbv/article/details/52034016