大数据梦工厂(0008 – MapReduce中Shuffle和排序机制解析)
1 – Shuffle
MapReduce 确保每个 Reducer 的输入都是按 key 排序。系统执行排序,将 Map 的输出作为输入传给 Reducer 的过程称为 Shuffle。
图片来自《Hadoop: The Definitive Guide》
2 – Map 端
Map 函数输出时,并不是直接写到磁盘,而是利用缓冲的方式写到内存并进行预排序。
其过程如下:
1、 每个 Map 任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲的大小为 100MB(mapreduce.task.io.sort.mb
),一旦缓冲区的内容达到 80%(mapreduce.map.sort.spill.percent
),便有一个后台线程将写入缓冲区的内容按照轮询的方式溢出(spill)写到磁盘指定目录(mapreduce.cluster.local.dir
)。溢出的过程中 Map 任务一边溢出一边继续写入缓冲区,如果缓冲区被写满,Map 任务就会阻塞直到后台线程写磁盘过程结束。
2、 在写磁盘之前,线程根据数据最终要传的 Reducer 把数据划分到相应的分区(Partition)。每个分区按 key 在内存中排序。
3、 如果有 Combiner 函数,则在分区排序后的输出上运行局部聚合操作,以减轻 Shuffle 过程中网络负载的压力。
4、 每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file
),因此在 Map 任务写完其最后一个输出后,会有多个溢出文件。最终会被合并成一个已分区且已排序的输出文件。如果 Map 输出的数据比较多,产生的小文件会很多,影响系统性能,因此需要进行合并,通过(mapreduce.task.io.sort.factor,默认为 10
)设置一次最多可以合并的文件个数。
5、 Map 输出到磁盘的过程中,可以设置压缩,默认关闭 Map 输出压缩。为加快写磁盘速度,节约磁盘空间,并且减少传给 Reducer 的数据量,通过设置(mapreduce.map.output.compress
)为 true 开启压缩,使用的压缩库由(org.apache.hadoop.io.compress.DefaultCodec
)指定(Hadoop 的压缩 codec 格式见下文
)。
以上便是 Map 任务输出过程的主要步骤,输出到磁盘后,Reducer 通过 HTTP 服务获取输出文件的分区数据。用于文件分区的工作线程的数据由(mapreduce.shuffle.max.threads
)控制,该设置针对每一个 NodeManager,而不是每个 Map 任务。默认值为 0 表示最大线程设置为机器中可用处理器数的 2 倍。
3 – Reduce 端
Reduce 端在 Shuffle 阶段主要分为:Copy、Merge、Sort 以及 Reduce 阶段。
3.1 – Copy 阶段
ReduceTask 从各个 MapTask 上远程复制数据,因此只要有 MapTask 完成,ReduceTask 就开始复制其输出。复制的过程可以使多线程并发进行,并发数由(mapreduce.reduce.shuffle.parallelcopies,默认为 5
)设置。
Map 任务成功完成后,通过心跳机制通知 ApplicationMaster,Reducer 中的一个线程定期查询 ApplicationMaster,以获取完成的 MapTask 输出的主机位置,从而去对应的主机复制数据,直到获得所有的输出位置。
由于第一个 Reducer 可能会失败,因此主机并没有在第一个 Reducer 检索到 Map 输出时就立即从磁盘上删除它们。相反,主机会等待作业完成,直到 ApplicationMaster 告知它删除 Map 输出。
如果 Map 输出相对较小,会被复制到 ReduceTask 的 JVM 内存(缓冲区大小比例由 mapreduce.reduce.shuffle.input.buffer.percent 控制,默认为 70%
),否则,Map 输出被复制到磁盘。一旦内存缓冲区达到阈值(mapreduce.reduce.shuffle.merge.percent,默认为 66%
)或达到 Map 输出文件数阈值(mapreduce.reduce.merge.inmem.threshold,默认 1000
),则将内存的数据合并后溢出写到磁盘。如果设置了 Combiner 函数,则在写入磁盘前调用 Combiner 函数以减少写入磁盘的数据量。
3.2 – Merge 阶段
在远程复制数据的同时,随着磁盘上溢写文件的增多,ReduceTask 启动两个后台线程对内存和磁盘上的文件进行合并(可能需要多次合并),每次合并的文件数由(mapreduce.task.io.sort.factor,默认为 10
)控制。为了合并,压缩的 Map 输出都必须在内存中被解压缩。
3.3 – Sort 阶段
用户编写 reduce()
函数输入数据是按 key 聚集的一组数据,进行快速排序(字典顺序排序)。由于各个 MapTask 已经实现对其输出处理结果进行了局部排序,因此,ReduceTask 只需要对所有数据进行一次归并排序即可。
3.4 – Reduce 阶段
最后一次合并排序(可以来自内存和磁盘片段)的时候,直接把数据写入到 reduce() 函数,从而省略了一次磁盘的往返读写过程。最后输出直接写到文件系统(如:HDFS
)上。
注意事项:
- ReduceTask=0,表示没有 Reduce 阶段,输出文件个数 Map 个数一致。
- ReduceTask=1,默认值,所以输出文件个数为 1 个。
- 具体有多少个 ReduceTask,需要根据集群性能而定。
- 如果数据分布不均匀,可能会在 Reduce 阶段产生数据倾斜。
4 – 参数调优
在上面介绍 Shuffle 过程时,已经提过相关参数来提高 MapReduce 的性能。在 mapred-default.xml
配置文件修改,如下进行统一整理说明:
4.1 – Map 端参数调优
属性名称 | 默认值 | 描述 |
---|---|---|
mapreduce.task.io.sort.mb | 100 | Map 输出时所使用的内存缓冲区大小,单位:MB |
mapreduce.map.sort.spill.percent | 0.80 | Map 输出溢写到磁盘的内存阈值百分比(0.8 或 80%) |
mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 溢出文件指定本地磁盘的目录 |
mapreduce.task.io.sort.factor | 10 | 排序文件一次最多合并的流数量 |
mapreduce.map.output.compress | false | 是否压缩 Map 输出 |
mapreduce.map.output.compress.codec | org.apache.hadoop.io.compress.DefaultCodec | Map 输出的压缩编解码器 |
mapreduce.shuffle.max.threads | 0 | Map 输出到 Reducer 的每个 NM 的工作线程。0 表示使用机器中可用处理器数的 2 倍 |
Hadoop 的压缩 codec 格式如下: | 压缩格式 | HadoopCompressionCodec |
---|---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec | |
gzip | org.apache.hadoop.io.compress.GzipCodec | |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec | |
LZO | com.hadoop.compression.lzo.LzopCodec | |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec | |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
1、减少溢写(Spill)次数
通过评估 Map 输出大小,增加 mapreduce.task.io.sort.mb
的值来减少溢写磁盘的次数,从而减少磁盘 I/O。可参考 MapReduce 计数器(SPILLED_RECORDS
)计算在作业运行整个阶段中溢写磁盘的记录数。
2、减少合并(Merge)次数
通过调整 mapreduce.task.io.sort.factor
的值,增大 Merge 的文件数目,减少 Merge 的次数,从而缩短 MapReduce 的处理时间。
3、合理修改运行任务内存
运行 Map 任务和 Reduce 任务的 JVM 内存大小由 mapreduce.map.java.opts
和 mapreduce.reduce.java.opts
设置。
4.2 – Reduce 端参数调优
属性名称 | 默认值 | 描述 |
---|---|---|
mapreduce.reduce.shuffle.parallelcopies | 5 | 把 Map 输出复制到 Reduce 的线程数 |
mapreduce.task.io.sort.factor | 10 | 排序文件一次最多合并的流数量 |
mapreduce.reduce.shuffle.input.buffer.percent | 0.70 | Shuffle 的复制阶段,分配给 Map 输出的缓冲区占堆空间的百分比 |
mapreduce.reduce.shuffle.merge.percent | 0.66 | Map 输出缓冲区的阈值使用百分比,超过将进行合并输出和溢写磁盘 |
mapreduce.reduce.merge.inmem.threshold | 1000 | 当 Map 输出文件数超过该阈值,进行合并输出和溢写磁盘,0 或更小的数表示没有阈值限制 |
mapreduce.reduce.input.buffer.percent | 0.0 | 在 reduce 过程中,内存保存 Map 输出的空间占整个堆空间的比例。默认情况下,Reduce 任务开始前,所有的 Map 输出合并到磁盘,以便为 reduce 提供尽可能多的内存。 |
1、合理设置 Map 和 Reduce 个数
如果都设置较小,会导致 Task 等待,延长处理时间;如果设置太多,会导致 Map 和 Reduce 任务间竞争资源,造成处理超时等错误。
2、合理设置 Map 和 Reduce 共存
通过调整 mapreduce.job.reduce.slowstart.completedmaps
(默认0.05),表示至少 Map 任务完成 5% 时,Reduce 任务才会开始运行,以减少 Reduce 任务的等待时间。
3、合理设置 Reduce 端的缓冲区
当数据达到一个阈值时,缓冲区中的数据就会写入磁盘,Reduce 从磁盘中读取所有数据,增加网络宽带负载。通过调整 mapreduce.reduce.input.buffer.percent
参数为 1.0(或一个更低的值,但要大于 0.0),Reduce 会直接读取保留指定比例的缓冲区中的数据,以提升性能。
::: hljs-center
扫一扫,我们的故事就开始了。
:::
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/161833.html