1 – MapReduce简介
Hadoop MapReduce 是一个分布式计算框架(也称为编程模型)。基于它编写的应用程序能够以一种可靠、容错的方式在大规模集群(数千个节点)上并行处理 TB 级别的海量数据集。
MapReduceTask 过程分为两个处理阶段:Map 阶段和 Reduce 阶段
。每个阶段都是以 <key, value> 键值对作为输入和输出,也可以自定义编写 map() 函数和 reduce() 函数。
网上一个比较形象的例子解释 MapReduce:
我们要数图书馆中的所有书。
你数 1 号书架,我数 2 号书架。这就是 “Map”。我们人越多,数书就越快。
现在我们到一起,把所有人的统计数加在一起。这就是 “Reduce”。
2 – MapReduce编程模型
MapReduce 作业的输入和输出类型:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
以词频统计为例进行说明,MapReduce 处理的数据流程如下:
mr_process.png
MapReduce 的工作流程包括 5 个步骤:
- Input – 读取文本文件;
- Splitting – 将文本按行拆分(可按空格、逗号、分号、行、换行符 (‘/n’) 等拆分),得到
K1
表示行数,V1
表示对应行的文本内容; - Mapping – 将每一行按空格并行拆分处理,排序得到
List(K2,V2)
,其中K2
表示每一个单词,由于是做词频统计,所以V2
的值为 1,代表出现 1 次; - Shuffling – 由于 Mapping 操作可能是在不同的机器上并行处理,所以需要通过 Shuffling 将相同 key 值的数据分发到同一个节点上去合并,然后排序,此时得到
K2
为每一个单词,List(V2)
为可迭代集合,V2
就是 Mapping 中的V2
; - Reducing – 该模型是统计每个单词出现的总次数,所以
Reducing
对List(V2)
进行归约求和操作,将这些数据元组组合成一个更小的元组集; - Final Result – 最终输出文件保存在文件系统(如:HDFS)上。
3 – MapReduce2工作机制
图片来自《Hadoop: The Definitive Guide》
1、run job
通过 Job 对象的 submit()
或者 waitForCompletion()
方法创建一个内部的 JobSummiter 实例,提交作业。waitForCompletion() 方法通过每秒循环轮询作业进度,如果发现与上次报告有改变,则把进度报告到控制台。
Job.submit()
– 将作业提交到集群并立即返回。Job.waitForCompletion()
– 将作业提交到集群并等待它完成。
2、get new application
向 ResourceManager 申请全局唯一 Application ID,RM 检查作业的输出说明、计算作业的输入分片。
3、copy job resources
将运行作业所需的资源(包括作业 JAR 文件、配置文件和输入分片)复制到一个以作业 ID 命名的 HDFS 共享目录中。
4、submit application
通过 submitApplication()
方法提交作业到 ResourceManager。
5a、start container
ResourceManager 收到 submitApplication()
消息后,便将请求传递给 YARN 调度器(Scheduler)。
5b、launch
调度器为其分配一个容器(Container),RM 在 NodeManager 的管理下在容器中启动 MRAppMaster 进程。
6、initialize job
ApplicationMaster 是一个 Java 应用程序,主类是 MRAppMaster,对作业进行初始化,创建多个薄记对象以跟踪作业进度。
7、retrieve input splits
ApplicationMaster 接受来自 HDFS 共享目录的、客户端计算的输入分片。对每一个分片创建一个 MapTask 对象以及由 mapreduce.job.reduces
参数设置 reduce 个数。任务 ID 在此时分配。
如果作业很小就会启动一个 JVM 运行 MapReduce 作业,称为 uberized 或者作为 uber 任务运行。通过设置 mapreduce.job.ubertask.enable
为 true 使用。
哪些是小作业?
当小于 10 个 mapper 且只有 1 个 reducer 且输入大小小于一个 HDFS 块的作业。
8、allocate resources
如果作业不适合作为 uber 任务运行, ApplicationMaster 就会为该作业中的所有 MapTask 和 ReduceTask 向 RM 请求容器。每个 mapper 和 reducer 都默认分配到 1024 MB 内存和 1 个虚拟内核。但 MapTask 完成 5% 时(mapreduce.job.reduce.slowstart.completedmaps,默认 0.05),ReduceTask 才会开始运行,以减少 ReduceTask 的等待时间。
分别通过以下 4 个参数来设置
mapreduce.map.memory.mb
mapreduce.reduce.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.cpu.vcores
9、start container
一旦 RM 的调度器为任务分配了容器,ApplicationMaster 就与 NM 通信启动容器。
9b、launch
该任务有主类 该任务由主类 YarnChild 的 Java 应用程序执行。
10、retrieve job resources
在运行任务之前,首先将任务需要的资源本地化,包括作业配置、JAR 文件和所有来自 HDFS 共享目录的文件。
11、run
运行 MapTask 或 ReduceTask。
12、进度和状态的更新
MapReduce 作业是长时间(从数秒到数小时)运行的批量作业。因此,能够得知关于作业进展的一些反馈很重要。例如,作业或任务的状态(运行中、成功完成、失败)、MapTask 和 ReduceTask 的进度、作业计数器的值、状态消息或描述等实时信息。
在作业运行期间,客户端每秒轮询一次 ApplicationMaster 以接收最新状态(轮询间隔通过 mapreduce.client.progressmonitor.pollinterval 设置,以毫秒为单位
)。客户端也可以使用 Job 的 getStatus() 方法得到一个 JobStatus 的实例,包含作业的所有状态信息。如下图所示:
图片来自《Hadoop: The Definitive Guide》
4 – MapReduce数据流
4.1 – 一个 Reduce 任务
一个 Reduce 任务的 MapReduce 数据流,通常是所有的 Map 任务的输出。
图片来自《Hadoop: The Definitive Guide》
4.2 – 多个 Reduce 任务
多个 Reduce 任务的 MapReduce 数据流,每个 Reduce 任务的输入都是来自多个 Map 任务(Map 任务和 Reduce 任务之间的数据流称之为 Shuffle)。
图片来自《Hadoop: The Definitive Guide》
4.3 – 无 Reduce 任务
无 Reduce 任务的 MapReduce 数据流,数据处理可以完全并行(即无需 Shuffle),唯一的非本地节点数据传输是 Map 任务将结果写入到 HDFS。
图片来自《Hadoop: The Definitive Guide》
5 – Map & Reduce
5.1 – Map
1、Map 概述
MapReduce 框架根据作业的 InputFormat 来:
- 检查作业输入的有效性(基于行的日志文件、二进制格式文件、数据库表等);
- 把输入文件切分成多个 InputSplit 实例,切片大小,默认等于 Block 大小(128MB);
- 通过 RecordReader 读取 InputSplit 转换为标准的 <key, value> 键值对,作为 Map 输出,直到读取完成;
- 一旦读取完成,这些键值对被发送到 Mapper 处理,并提供输出 <key, value> 键值对;
- Mapper 的输出称为中间输出,把输出作业结果写到文件系统(如:HDFS)上。
2、需要多少个 Map?
Map 的数量通常是由输入数据的总大小决定,即输入文件的总块(Block)数
。
Map 正常的并行规模大致是每个节点(Node)大约 10-100 个 Map
,对于 CPU 消耗较小的 MapTask 可以设到 300 个左右。由于每个任务初始化需要一定的时间,因此,Map 执行的时间至少超过 1分钟。
InputFormat 决定 Map 的数量:
Map = {(总数据大小)/(每个块(block)的大小))}
例如:如果数据总大小是 1TB,并且 InputSplit 大小是 128MB,那么:
Map = (1*1024*1024)/128 = 8192
5.2 – Reduce
1、Reduce 概述
MapReduce 框架根据作业的 OutputFormat 来:
- 检验作业的输出,例如:检查输出路径是否已经存在。
- 通过 RecordWriter 把输出作业结果写到文件系统(如:HDFS)上。
Reduce 有三个阶段:
- Shuffle – Reducer 的输入就是 Mapper 已经排好序的输出。
- Sort – 按照 key 值对 Reducer 的输入进行分组。Shuffle 和 Sort 是同时进行的。
- Reduce – 将键值对组合起来,并根据实现的业务逻辑输出结果写到文件系统(如:HDFS)上,且没有排序。
2、需要多少个 Reduce?
Reduce 的数量计算如下:
Reduce = <0.95 或 1.75> * (<节点数> * <每个节点的最大容器数>)
- 使用 0.95,所有 ReduceTask 会在 MapTask 完成时立刻启动,开始传输 Map 的输出结果。
- 使用 1.75,第一轮 ReduceTask 由速度更快的节点完成;第二轮 ReduceTask 启动,这样可以得到比较好的负载均衡的效果。
增加 Reduce 的数量会增加整个 MapReduce 框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
上述比例因子略小于整数,是为了给 MapReduce 框架中的推测性任务(speculative-tasks) 或失败任务预留一些 Reduce 的资源。
6 – Combiner & Partitioner
6.1 – Combiner
1、Combiner 概述
Combiner 在 Map 阶段,对每一个 MapTask 所在的节点运行(局部汇总),是一个可选的操作,将同一个 key 值的中间结果合并。这里以词频统计为例:
aa bb cc dd ee aa ff bb cc dd ee ff
输入文件中共有 12 个 keys,Mapper 在遇到一个 aa 的 key 时就会记录为 1,但是这文件里 aa 可能会出现 n 次,那么 Mapper 输出文件冗余就会很多,因此在 Reduce 计算前对相同的 key 做一个合并操作,那么需要传输的数据量就会减少,传输效率就可以得到提升。
但并非所有场景都适合使用 Combiner,使用它的原则是 Combiner 的输出不会影响到 Reduce 计算的最终输入,例如:求总数,最大值,最小值时都可以使用 Combiner,但是做平均值计算则不能使用 Combiner。
因此,Combiner 的用途如下:
- 最大限度的减少 MapTask 和 ReduceTask 之间数据传输的时间;
- 减小 MapTask 和 ReduceTask 之间的数据传输量,以减轻 Shuffle 过程中网络带宽占用。
2、使用 Combiner 词频统计示例
想要使用 Combiner 功能只要在组装作业时,添加下面一行代码即可:
// 设置 Combiner
job.setCombinerClass(IntSumReducer.class);
输入文件中共有 12
个 keys。输入通过 Mapper 处理,相同的 12
个 keys 键值对作为输入发送到 Reducer。
① 不使用 Combiner 的情况:
图片来源:TutorialsCampus
② 使用 Combiner 的情况:
图片来源:TutorialsCampus
可以看到,使用 Combiner 的时候,需要传输到 Reducer 中的数据由 12 Keys
,降低到 10 Keys
,降低的幅度取决于 Keys 的重复率。利用 Combiner 来减少通过 Shuffle 传输的数据量。
6.2 – Partitioner
Partitioner (分区)用于划分 key 值空间(key space),也可以理解成分类器。
默认 Partitioner 是根据 Map 的输出结果按照 key 值的不同分别给对应的 Reduce。支持自定义实现。例如:将统计结果按照手机归属地,不同的省份输出到不同的文件中。
::: hljs-center
扫一扫,我们的故事就开始了。
:::
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/154104.html