本篇文章给大家分享的是有关怎样分析MapReduce,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
今天详细阐述一下MapReduce。鉴于Hadoop1.X已过时,Hadoop3.X目前用的还不多,企业中目前大量运用的还是Hadoop2.X,所以以下都是基于Hadoop2.X版本的MapReduce(后续要讲的HDFS和Yarn也是)。
1. MRAppMaster:负责整个程序过程调度及状态协调
2. MapTask:负责map阶段整个数据处理流程
3. ReduceTask:负责reduce阶段整个数据处理流程
这里笔者还是要强调一点:MapTask和ReduceTask是进程级别,这一点很重要!
MapReduce处理数据主要分为两个阶段:map和reduce,对应到上图分别对应的处理实例就是MapTask和ReduceTask。数据处理先进内存然后刷磁盘,虽然有溢出比限制,但是笔者强调,落磁盘至少一次,通过上图以及接下来的讲解明白了MapReduce的整个处理流程、细节也就能掌握shuffle阶段都干了什么。
切片也就是把文件切成一个个block块,但是此处的切片是逻辑切片而非物理切片。切片的逻辑可以查看接口InputFormat<K, V>的getSplits方法,通过它的一个实现类FileInputFormat看看切片的默认实现机制,直接看源码:
这里咱们主要关注文件可切分的部分,通过分析源码,在FileInputFormat中,默认切片机制如下:
1. 简单的按照文件的内容长度进行切片
2. 切片大小,默认等于block大小
3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
因此默认情况下,切片大小等于blocksize。但是,不论怎么调参数,都不能让多个小文件“划入”一个split, 会影响性能, 后续讲HDFS时会说明一下小文件的问题
了解完切片机制之后,初学者容易陷入一个误区,就是比如我配置blocksize为128M,那么我一个文件就会按照128M等比例切分,切到最后不足128M部分单独作为一个切片,但笔者强调这是要分情况的。其实细心的小伙伴会看到我源码截图中的注释部分,关键的参数SPLIT_SLOP为1.1,同样以blocksize为128M为例,假如对于一个130M的可切分文件会产生几个block块呢?很显然130 < 128*1.1,就产生一个切片为130M的block,所以多看源码很重要。
1)MapTask并行度决定机制
2)ReduceTask并行度决定机制
ReduceTask设置方式就很简单了,可以直接手动设置:job.setNumReduceTasks(4);,默认值是1,手动设置为4。ReduceTask的并行度同样影响整个任务的执行效率,如果数据分布不均匀,就有可能产生数据倾斜。
注意:ReduceTask设置方式就很简单了,可以直接手动设置:数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask设置方式就很简单了,可以直接手动设置:。尽量不要运行太多的ReduceTask。对大多数任务来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的reduce slots小。这个对于小集群而言,尤其重要。
并发数的选择受多方面因素影响,比如运算节点的硬件配置、运算任务的类型:CPU密集型还是IO密集型、运算任务的数据量,这个还是要根据实际情况而定。
3)map|reduce端核心组件
在map输出数据溢出到磁盘之前调用。默认根据key.hashcode%reduce数量(HashPartition),可以自定义分区组件
慎用:
调用次数不一定,不能影响核心业务逻辑,如对数据求平均值:
2 5 6:加入combiner,2+5+6/3=13/3
4 3 :加入combiner,4+3/2=7/2,最终(13/3+7/2)/2 = 47/12
不加combiner组件:2+5+6+4+3/5 = 4
c) 分组
public class ItemidGroupingComparator extends WritableComparator {
//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
protected ItemidGroupingComparator() {
super(Order.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//强转
Order aBean = (Order) a;
Order bBean = (Order) b;
//比较两个bean时,指定只比较bean中itemId
return aBean.getItemId().compareTo(bBean.getItemId());
}
}
MapReduce通过DistributedCache,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并提供相关机制对cache文件进行管理。但需要注意:需要分发的文件,必须提前放到hdfs上;需要分发的文件在任务运行期间最好是只读的;不建议分发较大的文件,影响性能
。主要用来分发第三方库、多表数据join时小表数据简便处理等。可以在自定实现Mapper类,重写setup方法中进行分布式缓存处理。
以上就是怎样分析MapReduce,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/223287.html