1:最简单的过程:
Map – Reduce
2:定制了partitioner以将map的结果写到相应的分区,以供对应的reducer下载:
Map – Partition – Reduce
3:增加了在本地先进性一次reduce(本地优化),减少后期网络的传输量
Map – Combine(本地reduce) – Partition – Reduce
一般说来,一个完整的MapReduce过程可以分为以上3中提到的4个步骤,下面以Hadoop自带的MaxTemperature为例,说明四个步骤的工作:
假设输入文件的内容为:
File1:
1949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9
+01111+99999999999
1950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9
+00001+99999999999
1949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9
+00781+99999999999
004301199099999
1950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9
+00221+99999999999
004301199099999
1950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9
-00111+99999999999
1950032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9
+00081+99999999999
1949051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9
+07001+99999999999
1950032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9
+00091+99999999999
004301199099999
1949051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9
+08021+99999999999
004301199099999
1949051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9
-00451+99999999999
一:Map
a. mapper用Job的setMapperClass指定
b.map的输入通过Job的setInputFormat指定,默认为TextInputFormat(每行为一条记录,以偏移为key,行 内容为value),还可以指定为KeyValueTextInputFormat [在org.apache.hadoop.mapred中](每行为一条记录,分隔符之前为key,之后为value,默认的分隔 符是/t,通过mapred.textinputformat.seperator指定),SequenceFileInputFormat(键和值由用 户指定,序列文件为hadoop专用的二进制压缩格式),还有NLineInputFormat(与Textnputormat相同,但每个分片一定由N 行,N默认为1,通过,mapred.line.input.format.linespermap设定[在Eclipse的Hadoop参数的Edit hadoop location中的高级参数设置中])
c.map的输出与输入格式类似,反推即可,例如对应SequenceFileInputFormat,输出类为SequenceFileOutputFormat
处理输入数据,将数据按照用户想要的<key,value>形式collect起来
在经过这个步骤后,现在的其中一个Map输出数据为以下形式:<1949,111>,<1950,0>,<1949,78>,<1950,22>,<1950,-11>
二:Combine
combiner是在本地进行的一个reduce的过程,其目的是提高hadoop的效率。
直接将数据交给下一个步骤处理,这个例子中存在三个以1950为键的记录,所以在下一个步骤中需要处理三条<1950,0>,<1950,22>,<1950,-11>记录,如果先做一次combine,则只需处理一次<1950, 22>的记录,这样做的一个好处就是,当数据量很大时,减少很多开销。(直接将partition后的结果交给reduce处理,由于 tasktracker并不一定分布在本节点,过多的冗余记录会影响IO,与其在reduce时进行处理,不如在本地先进性一些优化以提高效率)
三:Partition
Partitioner相当与一个分发器,将Map处理后的输出分发给Reduce去执行。得到map给的记录后,他们该分配给哪些reducer来处理呢?hadoop采用的默认的派发方式(默认的partition实现)是根据散列值来派发的,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过默认partition处理后,一个节点的reducer分配到了20条记录,另一个却分配道了10W万 条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个reducer,我们想要最终结果中part- 00000中存储的是”h”开头的记录的结果,part-00001中存储其他开头的结果,这些默认的partitioner是做不到的。所以需要我们自 己定制partition来根据自己的要求,选择记录的reducer。
自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可
备注:hadoop的partition过程会根据数据格式,选择相应的比较规则对数据进行排序
四:Reduce
reduce就是所谓的归并阶段,此阶段实际上包含三个小的阶段:
1、shuffle
分配了Reduce任务的节点,通过网络使用HTTP的方式将排序后的输出结果从每一个运行Map节点的相应分区上复制过来。
2、sort
MapReduce框架根据键合并(由于执行Map的不同节点可能会产生相同的key-value)、排序Reduce节点的输入数据。
shuffle阶段和sort阶段同时发生,例如当输出获取到的同时,它们也会被合并。
secondarySort(二次排序):
为了在value迭代器返回的值上进行二次排序,应用需要用使用第二个键扩展原来的键,同时定义一个分组比较器。这些键使用所有的键进行排序,但是分组的时候使用分组比较器来决定哪些键值对放在
放在一起,然后发送给同一个Reduce调用来归并。分组比较器指定通过Job.setGroupingComparatorClass(Class)设定,而排序的顺序通过Job.setSortComparatorClass(Class)控制。
举例如下:
例如,假设你想查找网页的副本个数同时用最简单的方式标记他们的url。你的job将会设置如下:
- Map 输入 Key : url
- Map 输入 Value : document
- Map 输出 Key: document checksum, url pagerank
- Map 输出Value:url
- Partitioner: 根据checksum进行派发
- OutputKeyComparator: 通过checksum,根据pagerank降序排列
- OutputValueGroupingComparator: 根据checksum进行分组,然后Partitioner据此进行Reduce任务的派发。
3、reduce
在这个阶段,我们的数据是以<key,list(value1,value2…)>的方式存储的,对于每一个数据都会调用reduce方法。 以MaxTemperature的例子来说,此时的记录应该是File 1的Reduce前(表述不确切): <1950,22>,File 2的Reduce前(表述不确切): <1950,09>,那么Reduce后的结果为<1950,22>。
此时,只要根据需求,处理key和values即可
最终结果:
1949 111
1950 22
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9804.html