spark的任务调度
在上次我们讲了 Spark Job的提交,我们提到,当rdd触发action操作之后,会调用SparkContext的runJob方法,最后调用的DAGScheduler.handleJobSubmitted方法完成整个job的提交。然后DAGScheduler根据RDD的lineage进行Stage划分,再生成TaskSet,由TaskScheduler向集群申请资源,最终在Woker节点的Executor进程中执行Task,下面就来先看看生成stages的调度阶段;sparkcontext 为程序的运行入口,在初始化的时候,会分别创建DAGScheduler作业调度和taskScheduler任务调度,两级调度模块,其中作业调度模块基于任务阶段的高层调度模块。
DAGScheduler 作业调度是基于任务阶段的高层调度模块它的作用可以归纳以下几点:
1:接收用户提交的job;
2:将job按类型划分不同的stages(根据shuffle boundaries划分stages),每个stage都只有一个shuffle dependencies on oher stages;dagschedule记录哪些RDD和stages被物化;并在每个stage内产生一系列的task,并把这些task封装为taskset
3:决定每个Task的最佳位置(任务在数据所在的节点上运行),并结合当前的缓存情况;将TaskSet提交给TaskScheduler;
spark 的stages
stages是DAGScheduler根据RDD的lineage进行Stage划分,划分的依据是shuffle boundary;存在两种stages
第一种:ShuffleMapStage, in which case its tasks’ results are input for another stage
其实就是,非最终stage, 后面还有其他的stage, 所以它的输出一定是需要shuffle并作为后续的输入。 这种Stage是以Shuffle为输出边界,其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出 其输出可以是另一个Stage的开始。 ShuffleMapStage的最后Task就是ShuffleMapTask。 一个Job里可能有该类型的Stage,也可以能没有该类型Stage。
第二种:ResultStage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)
ResultStage, 没有输出, 而是直接产生结果或存储。这种Stage是直接输出结果,其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出。 ResultStage的最后Task就是ResultTask,在一个Job里必定有该类型Stage。一个Job含有一个或多个Stage,但至少含有一个ResultStage。
上一节说到
handleJobSubmitted方
法中划
分stages 下面
源码可以从下面进入
handleJobSubmitted方
法中划
分stages 下面
源码可以从下面进入
下面进入newResultStage()方法的源码生成finalStage,如下:
如上面源码可以看出,在创建resultStage之前,先要生成parentStage,下面进入生成parentStages方法源码如下:
下 面进入getShuffleMapStage()方法源码,经过此方法生成shuffleMapStage
总结可得:stage创建过程也就是DAG的过程,并是递归进行的,所以在创建后一个Stage时,必然保证其直接父Stage已经创建(未创建,便会递归调用getParentStage来创建父Stage),知道递归的最底层,也就是遇到了DAG中的最左边的RDD,此时会为这个时候的ShuffleDependency创建Stage0,并跳出底层递归,然后逐层创建stage,并返回给上一层,知道最后来到顶层也就是Result Stage,创建Result Stage,完成DAG的创建。
下面来了解submitStage(finalStage)方法源码:
下面进入submitMissingTask()方法中,此方法把stage和jobId作为参数提交,进入源码如下可以看到,把stage转为为task
如上图源码,最后封装为TaskSet,并调用taskScheduler类中的submitTasks方法,到此提交阶段进入到了任务调度类中,taskSchedule中怎么提交,见下节;
下面就以上做了总结;
当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分 Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。
具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage。 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交
每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet, 这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于 DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager调度具体的Task到对应的Executor节点上进行运算
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9397.html