上一篇主要介绍spark的application的提交流程,下面介绍spark job的提交;以collect job为例,如上节描述,spark-submit类中runMain方法中经过反射调用了自定义的jar包中的main方法,main方法中初始化sparkcontext,jar中最后一个action为collect为例说明流程,代码如下,collect 方法中sparkcontext 会调用runJob方法:
/** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
sparkcontext 为程序的运行入口,在初始化的时候,会分别创建DAGScheduler作业调度和taskScheduler任务调度,两级调度模块,其中作业调度模块基于任务阶段的高层调度模块。job提交流程首先调用sparkcontext的runJob方法;
下一步DAGScheduler 中调用runJob方法,源码见下方;
下一步调用dagSchedule 类中submitJob()方法,源码见下方:
进入eventProcessLoop.post()方法中,把此提交时间放入到队列中
而eventProcessLoop的类是下图所示
并且会调用doOnReceive方法,并判断事件类型,最后掉用handlejobSubmitted方法,此方法中会生成stage,具体stage生成见下节源码
DAGSchedule类自定义一个私有类继承了EventLoop类,
private[scheduler] class (dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging { private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer /** * The main event loop of the DAG scheduler. */ //重写了onReceive方法 override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { //调用此方法 doOnReceive(event) } finally { timerContext.stop() } } // 调用此方法 private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { //判断是否为job提交 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => //调用handleJobSubmitted方法 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => val filesLost = reason match { case SlaveLost(_, true) => true case _ => false } dagScheduler.handleExecutorLost(execId, filesLost) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def onError(e: Throwable): Unit = { logError(" failed; shutting down SparkContext", e) try { dagScheduler.doCancelAllJobs() } catch { case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) } dagScheduler.sc.stopInNewThread() } override def onStop(): Unit = { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() } }
最后调用的dagScheduler.handleJobSubmitted方法完成整个job的提交,stage的划分将在此方法进行,具体stage划分将在下一节介绍
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9398.html