-
Spark中最重要的机制有那些?
1.RDD,2.Spark调度机制,3Shuffle过程
-
什么是RDD?
可以这么说,你懂了RDD,基本上就可以对Hadoop和Spark的一半给吃透了,那么到底是RDD
RDD(弹性分布式数据集)首先体现数据集,RDD是对原始数据的封装,该种数据结构内部可以对数据进行逻辑分区,其次分布式体现是并行计算以及需要解决容错问题,也就是根据依赖,找到第一层RDD,最后根据RDD编号与分区编号,可以唯一确定该分区对应的块编号,就能从存储介质中提取出分区对应的数据。在就是弹性,RDD在可以不改变内部存储数据记录的前提下,去调整并行计算单元的划分结构(这个可能就是Stage)
-
基本概念
(1)应用程序:用户构建的Spark应用程序,包含驱动程序(一个Driver功能的代码)和在集群的多个工作结点上运行的Executor代码。
(2)驱动程序:包含main入口函数并在main函数内实例化SparkContext对象的应用程序称为驱动应用程序。不说了,直接上代码如下:
Var logFile=”YOUR_SPARK_HOME/README.md”//本地文件目录
val conf=new SparkConf().setAppName(“Simple Application”);//给Application命名
val sc=new SparkContext(conf);(3)Master(ClusterManager)管理者整个集群,目前Spark主要支持三种类型:Standlone模式,Mesos模式,Yarn模式。
(4)Worker节点:运行Worker守护进程的集群结点。
(5)任务执行器(Executor):一个Worker节点上可能有多个Executor, 每个Executor都拥有固定的核心数量和堆栈大小。
(6)作业(job)::包含多个Task(任务)组成的并行计算(并排的那些分区)),往往由Spark的action触发产生。在Spark中通过runJob方法向Spark集群中提交Job
(7)阶段(Stage):每个job会因为RDD之间的依赖关系被拆分成多个Task集合,其名称称为Stage,每一个Task集合,也可以叫TaskSet(任务集)
补充:
每个Application中可能有多个job,相互独立。
每个Worker可以起一个或多个Executor。
每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。
每个Task执行的结果就是生成了目标RDD的一个partiton。 -
依赖于并行计算如何理解?
4.1分区是并行计算的基本单位:一个原始数据分成了10个分区,那么就可以同时并行这个10分区,是不是可以这样去理解?不一定,如果都是窄依赖,没有问题,但其中会涉及到宽依赖,这其中就会产生分区与分区之间的数据进行交叉,反正不像同时完这10个分区数据这么快。
4.2每个分区内数据的计算当成一个并行任务,每个并行任务包含一个计算链,每一个CPU核心就去执行这些计算连。直接,简单,干脆,不玩虚的,上代码理解计算链:
rdd.map(line=>line.length).filter().等等之类的。
如果这些计算链之间都是独立的,而且互不影响,那么我们可以并行计算。我们可以将这些链条之间的关系定义为窄依赖(一对一依赖和范围依赖)
-
RDD为什么要划分Stage,怎么划分stage?
如果子RDD一个分区内的数据依赖于多个父RDD中分区的数据,这个叫做宽依赖,或者叫做Shuffle依赖,那么如果有多个子RDD,每个子RDD都依赖多个父RDD中分区的数据,我们是不是要想办法把RDD数据保存起来,提供给这些子分区计算使用,否则是不是每个分区都要重新计算多个父RDD数据,也在这个地方开始划分Stage的原因。凡是遇到宽依赖,就划分stage。
-
Spark如何管理资源?
-
Spark集群管理器分为三种,Standlone模式,Mesos模式,Yarn模式。这是重点,但又不是很重要,所以这地方不是十分了解,也没有多大损失。
-
Spark内部如何调度?
DAGScheduler是面向Stage的任务调度器,负责接收Spark应用提交的Job,根据RDD的依赖关系划分Stage,并提交Stage给TaskScheduler
TaskScheduler是面向Task的任务调度器,它接受DAGScheduler提交过来的TaskSets,然后把一个个Task提交到Work结点运行,每个Executor运行什么Task也是在此处分配的。
最重要的就是这张图了:
-
(1)任何的Spark应用程序都包含Driver和Executor代码。Spark应用程序首先在Driver初始化SparkContext。因为SparkContext是Spark应用程序通往集群的唯一途径。在SparkContext里面包含了两个调度器,一个是DAGScheduler和TaskScheduler,在创建SparkContext对象的同时也自动创建了这两个类。
(2)SparkContext初始化完成后,首先根据Spark的相关配置,想Cluster Master申请所需要的资源,然后在各个Worker结点初始化相应的Executor。Executor初始化完成后,Driver将通过对Spark应用程序中的RDD代码进行解析,生成相应的RDD graph(RDD图),该图描述了RDD的相关信息及彼此之间的依赖关系。即是图中第一个部分,这些RDD Objects
(3)RDD图构建完毕后,Driver将提交给DAGScheduler进行解析。DAGScheduler在解析RDD图的过程中,当遇到Action算子后将进行逆向解析,根据RDD之间的依赖关系,以及是否存在Shuffle,将RDD图解析成一系列具有先后依赖关系的Stage。Stage以shuffle进行划分,即如果两个RDD之间存在依赖关系,DAGScheduler将会在这RDD之间拆分为两个Stage进行执行,且只有前一个Stage执行完毕之后,才执行后一个Stage。
(4)DAGScheduler将划分的一系列的Stage(TaskSet),按照Stage的先后顺序依次提交给底层的调度器TaskScheduler执行。
(5)TaskScheduler接收到DAGScheduler的stage任务后,将会在集群环境中构建一个TaskSetManager实例来管理Stage(TaskSet)的生命周期。
(6)TaskSetManager将会把相关的计算代码,数据资源文件等发送到相应的Executor上,并在相应的Executor上启动线程池执行。
(7)在Task执行的过程中,可能有部分应用程序涉及到I/0的输入输出,在每个Executor由相应的BlockManager进行管理,相关BlockManager的信息将会与Driver中的Blocktracker进行交互和同步。
(8)在TaskThreads执行的过程中,如果存在运行错误,或其他影响的问题导致失败,TaskSetManager将会默认尝试3次,尝试均失败后将上报TaskScheduler,TaskScheduler如果解决不了,在上报DAGScheduler,DAGScheduler将根据各个Worker结点的运行情况重新提交到别Executor中执行。
(9)TaksThread执行完毕后,将把执行的结果反馈给TaskSetManager,TaskSetManager反馈给TaskScheduler,TaskScheduler在上报DAGScheduler,DAGScheduler将根据是否还存在待执行的的Stage,将继续循环迭代提交给TaskScheduler去执行。
(10)待所有的Stage都执行完毕后,将会最终达到应用程序的目标,或者输出到文件,或者在屏幕显示等,Driver的本次运行过程结束,等待用户的其他指令或者关闭。
(11)在用户显示关闭SparkContext,整个运行过程结束,相关的资源或被释放,或被回收。
Spark这种运行形式有利于不同Application之间的资源调度,同时也就意味着不同的Application无法做到相互通信和信息交互。
Driver负责所有任务调度,所以他应该尽可能地靠近Worker结点,能在同一个网络中最后了。
10.Shuffle是怎么个过程?
只有当Shuffle依赖中父RDD所有分区的数据被计算和存储完毕后,子RDD才会开始拉取需要的分区数据。这里将整个数据传输的过程称为Spark的Shuffle过程。在Shuffle过程中,把一个分区数据计算完毕到数据被写入到磁盘的过程,称为Shuffle写过程。对应的,在子RDD某个分区计算的过程中,把所需的数据从父RDD拉取过来的过程,称为Shuffle读过程。
不论是Spark还是Hadoop,在对待shuffle的过程中有着诸多类似,一些概念可以直接套用,例如shuffle过程中,提供数据的一端称作map端, map端生成的任务称为mapper.对应的,接受数据的一端称作reduce端,reduce端每个拉取数据的任务称为reducer。Shuffle过程的本质是将map端获得的数据使用分区器进行划分,并将数据发送给对应的reducer的过程。
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/195565.html