spark入门之二 spark作业提交流程详解大数据

spark作业提交流程

  在运行Spar应用程序时,会将Spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下:

./bin/spark-submit examples/src/main/r/dataframe.

1.1为弄清楚整个流程,我们先来分析一下spark-submit脚本,spark-submit脚本内容主要功能如下:

脚本中调用SparkSubmit类

/bin/spark-class org.apache.spark.deploy.SparkSubmit 

1.2 提交流程进入到SparkSubmit.scala类中,进入此类后运行main方法,见下图

       a: 把传递过来的参数封装到appArgs变量中;

       b: 匹配submitAction 种类,按submit做案列说明

spark入门之二 spark作业提交流程详解大数据

   1.3  在判断是submit action 后会调用submit()方法

         a: Submit the application using the provided parameters  用配置的参数提交application

         b: This runs in two steps. First, we prepare the launch environment by setting up

       the appropriate classpath, system properties, and application arguments for 
         running the child main class based on the cluster manager and the deploy mode. 
       Second, we use this launch environment to invoke the main method of the child 
       main class.
    run 方法分两步:第一步通过设置的路径、系统参数、和application参数准备launch environment
    第二部:使用launch environment 调用main方法
      

            spark入门之二 spark作业提交流程详解大数据

1.3 后续调用runMain()方法

      后续用反射方法调用自己写的application,完成作业的提交

      

  /** 
   * Run the main method of the child class using the provided launch environment. 
   * 
   * Note that this main class will not be the one provided by the user if we're 
   * running cluster deploy mode or python applications. 
   */ 
  private def runMain( 
      childArgs: Seq[String], 
      childClasspath: Seq[String], 
      sysProps: Map[String, String], 
      childMainClass: String, 
      verbose: Boolean): Unit = { 
    // scalastyle:off println 
    if (verbose) { 
      printStream.println(s"Main class:/n$childMainClass") 
      printStream.println(s"Arguments:/n${childArgs.mkString("/n")}") 
      printStream.println(s"System properties:/n${sysProps.mkString("/n")}") 
      printStream.println(s"Classpath elements:/n${childClasspath.mkString("/n")}") 
      printStream.println("/n") 
    } 
    // scalastyle:on println 
 
    val loader = 
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { 
        new ChildFirstURLClassLoader(new Array[URL](0), 
          Thread.currentThread.getContextClassLoader) 
      } else { 
        new MutableURLClassLoader(new Array[URL](0), 
          Thread.currentThread.getContextClassLoader) 
      } 
    Thread.currentThread.setContextClassLoader(loader) 
    //加载jar包 
    for (jar <- childClasspath) { 
      addJarToClasspath(jar, loader) 
    } 
    //加载参数 
    for ((key, value) <- sysProps) { 
      System.setProperty(key, value) 
    } 
 
    var mainClass: Class[_] = null 
 
    try { 
      mainClass = Utils.classForName(childMainClass) 
    } catch { 
      case e: ClassNotFoundException => 
        e.printStackTrace(printStream) 
        if (childMainClass.contains("thriftserver")) { 
          // scalastyle:off println 
          printStream.println(s"Failed to load main class $childMainClass.") 
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") 
          // scalastyle:on println 
        } 
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS) 
      case e: NoClassDefFoundError => 
        e.printStackTrace(printStream) 
        if (e.getMessage.contains("org/apache/hadoop/hive")) { 
          // scalastyle:off println 
          printStream.println(s"Failed to load hive class.") 
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") 
          // scalastyle:on println 
        } 
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS) 
    } 
 
    // SPARK-4170 
    if (classOf[scala.App].isAssignableFrom(mainClass)) { 
      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") 
    } 
   //通过反射获取main方法 
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 
    //判断main方法是否为static 
    if (!Modifier.isStatic(mainMethod.getModifiers)) { 
      throw new IllegalStateException("The main method in the given main class must be static") 
    } 
 
    @tailrec 
    def findCause(t: Throwable): Throwable = t match { 
      case e: UndeclaredThrowableException => 
        if (e.getCause() != null) findCause(e.getCause()) else e 
      case e: InvocationTargetException => 
        if (e.getCause() != null) findCause(e.getCause()) else e 
      case e: Throwable => 
        e 
    } 
   try { 
     //调用main方法 
      mainMethod.invoke(null, childArgs.toArray) 
    } catch { 
      case t: Throwable => 
        findCause(t) match { 
          case SparkUserAppException(exitCode) => 
            System.exit(exitCode) 
 
          case t: Throwable => 
            throw t 
        } 
    } 
  }



原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9399.html

(0)
上一篇 2021年7月19日 09:31
下一篇 2021年7月19日 09:31

相关推荐

发表回复

登录后才能评论