MR编程模型
MR编程模型主要分为五个步骤:输入、映射、分组、规约、输出。
-
输入(InputFormat):
主要包含两个步骤—数据分片、迭代输入数据分片(getSplits):数据分为多少个splits,就有多少个map task; 单个split的大小,由设置的split.minsize和split.maxsize决定; 公式为 max{minsize, min{maxsize, blocksize}}; hadoop2.7.3之前blocksize默认64M,之后默认128M。 决定了单个split大小之后,就是hosts选择,一个split可能包含多个block(将minsize设置大于128M); 而多个block可能分布在多个hosts节点上(一个block默认3备份,如果4个block就可能在12个节点),getsplits会选择包含数据最多的一部分hosts。 由此可见,为了让数据本地话更合理,最好是一个block一个task,也就是说split大小跟block大小一致。 getSplits会产生两个文件 job.split:存储的主要是每个分片对应的HDFS文件路径,和其在HDFS文件中的起始位置、长度等信息(map task使用,获取分片的具体位置); job.splitmetainfo:存储的则是每个分片在分片数据文件job.split中的起始位置、分片大小和hosts等信息(主要是作业初始化时使用,用于map task的本地化)。 迭代输入:迭代输入一条条的数据,对于文本数据来说,key就是行号、value当前行文本。
- 映射(map):正常的map操作,将一对kv映射成为另外一对kv
- 分组(partition):
按照设置的reduce个数来进行分组,getPartitions共三个参数:k、v、partitionnum;
默认按照HashPartition,如果需要全排序,也可以设置TotalOrderPartitioner,它会采样一部分数据排序后设置R-1(R是reduce个数)个分割点,保证map task生成的R个文件的文件与文件之间的数据都是有序的,reduce只需要对单个文件内部再排序即可。 - 规约(reduce):reduce做聚合处理。
- 输出(OutputFormat):
一件事情是检查输出目录是否存在,如果存在则报错;
另一件事情是将数据输出到临时目录。
作业提交及初始化
- 作业提交与初始化大概分为4个步骤:执行提交、client上传文件到hdfs、client与JobTracker通信提交任务、JobTracker通知TaskScheduler初始化任务。
- JobClient与JobTracker的通信过程如下两所示
- 作业提交时序图
第一步:JobClient先跟JobTracker交互获取到一个jobid;
第二步:JobClient与HDFS交互创建输出目录;
第三步:与HDFS交互上传任务运行所以来的文件(配置文件、jar包等)
第四步:JobClient调用getSplits,与HDFS交互生成分片信息并写到分片文件中;
第五步:与jobtracker交互提交任务。 - JobTracker收到任务提交请求后会先生成一个JobInProgress对象,这个对象会管理和监控这个job的整个运行状况;之后JobTracker再告诉TaskSchduler进行作业初始化。
- 作业初始话大致过程如下
JobTracker与TaskTracker
- JobTracker主要负责作业的运行时管理,以三级树的方式进行管理:首先会给作业初始化一个对象JobInProgress,初始化后每个task有个TaskInProgress,每个task对应多个TaskAtempt。其中一个TA成功则此TI成功,所有TI成功则此job成功
JobTracker将很多数据以KV形式存储在map中,比如jobs存储的是jobid和JobInProgress的映射;
JobTracker通过接收TaskTracker的心跳请求,并发出应答来监控和管理作业运行过程,在应答中会下达各种命令:运行新task、杀死task等等 - TaskTracker:在每台机器上会启动一个TaskTracker进程,不断地向JobTracker发送心跳,汇报当前节点的资源使用情况、当前节点的task运行情况,并根据JobTracker在应答中的指令执行具体命令
TaskTracker会为每个task启动一个JVM(可重用,但是仅限于重用同类型任务)
TaskTracker启动一个新任务
第一步:先进行作业本地化,某个作业在TaskTracker上的第一个task会进行作业本地化,也就是把作业运行依赖的文件、jar包从hdfs下载到本地。(为避免多个task同时进行作业本地化,会对本地化操作加锁);
第二步:创建任务临时目录;
第三步:启动JVM,并在JVM运行任务(部分情况JVM可复用);
Map Task内部运行过程
map task总共可以五个过程:read、map、collect、splill、conbine。
Read:从数据源读入一条条数据;
map:将数据传给map函数,变成另外一对KV
collect阶段:
主要是map处理完的数据,先放入内存的环形缓冲区中,待环形缓冲区的值超过一定比例的时候再执行下一步的spill到磁盘;
collect()内部会调用getPartition来进行分区,而环形缓冲区则存储的是K、V和partition号
这里采用的两级索引结构,主要是排序时在同一个partition内排序,所以先排partition,再排partition内部数据。
kvindices中记录的分区号、key开始的位置、value开始的位置,也就是一对儿KV在kvindices中占用3个int,kvoffsets只记录一对KV在kvindices中的偏移地址,所以只需要一个int,所以二者按1:3的大小分配内存。
spill过程:
环形缓存区中内存数据在超过一定阈值后会spill到磁盘上,在splill到磁盘上之前会先在内存中进行排序(快速排序);
之后按分区编号分别写到临时文件,同一个分区编号后面会有个数字,表示第几次溢写,conbine:对多个文件合并,多伦递归,没轮合并最小的n个文件。
Reduce Task内部运行过程
reduce总共可分为以下几个阶段:shuffle、merge、sort、reduce、write
shuffle:从JobTracker中获取已完成的map task列表以及输出位置,通过http接口获取数据;
merge:shuffle拉去的数据线放入内存,内存不够再放入磁盘,会有一个线程不断地合并内存和磁盘中的数据
sort:reduce从不同的map task中拉取到多个有序文件,然后再做一次归并排序,则每个reduce获取到文件就都是有序的了
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/190799.html