spark作业提交流程
在运行Spar应用程序时,会将Spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下:
./bin/spark-submit examples/src/main/r/dataframe.
1.1为弄清楚整个流程,我们先来分析一下spark-submit脚本,spark-submit脚本内容主要功能如下:
脚本中调用SparkSubmit类
/bin/spark-class org.apache.spark.deploy.SparkSubmit
1.2 提交流程进入到SparkSubmit.scala类中,进入此类后运行main方法,见下图
a: 把传递过来的参数封装到appArgs变量中;
b: 匹配submitAction 种类,按submit做案列说明
1.3 在判断是submit action 后会调用submit()方法
a: Submit the application using the provided parameters 用配置的参数提交application
b: This runs in two steps. First, we prepare the launch environment by setting up
the appropriate classpath, system properties, and application arguments for running the child main class based on the cluster manager and the deploy mode. Second, we use this launch environment to invoke the main method of the child main class.
run 方法分两步:第一步通过设置的路径、系统参数、和application参数准备launch environment
第二部:使用launch environment 调用main方法
1.3 后续调用runMain()方法
后续用反射方法调用自己写的application,完成作业的提交
/**
* Run the main method of the child class using the provided launch environment.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:/n$childMainClass")
printStream.println(s"Arguments:/n${childArgs.mkString("/n")}")
printStream.println(s"System properties:/n${sysProps.mkString("/n")}")
printStream.println(s"Classpath elements:/n${childClasspath.mkString("/n")}")
printStream.println("/n")
}
// scalastyle:on println
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
//加载jar包
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
//加载参数
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
//通过反射获取main方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
//判断main方法是否为static
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
//调用main方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9399.html