Hadoop之MapReduce性能调优

    基于对这些组件的深入理解,用户可以很容易通过调整一些关键参数使作业运行效率达到最优,本文将分别从Hadoop管理员和用户角度介绍如何对Hadoop进行性能调优以满足各自的需求。


1 概述

Hadoop性能调优是一项工程浩大的工作,它不仅涉及Hadoop本身的性能调优,还涉及更加底层的硬件、操作系统和Java虚拟机等系统的调优。对这几个系统适当地进行调优均有可能给Hadoop带来性能提升。

 

HadoopJobTrackerTaskTracker

 

 

JVM

 

 

OS

 

 

Hardware(CPU Memory Network)

 

 

对于非Hadoop自身方面的性能调优,比如硬件、操作系统(IOScheduler、启用预读取机制、关闭Swap等)、Java虚拟机。本文将重点介绍如何通过调整Hadoop自带的一些参数使作业运行效率达到最优。总体来说,提高作业运行效率需要Hadoop管理员和作业拥有者共同的努力。其中,管理员负责为用户提供一个高效的作业环境,而用户则负责根据自己作业的特点让它尽可能的运行完成。

    在撰写本文之时,Apache Hadoop主要分为0.20.x、0.21.x、0.22.x和0.23.x四个系列,Cloudera Hadoop主要分为CDH3和CDH4两个系列,其中0.23.x和CDH4属于下一代MapReduce。

 

2 从管理员角度进行调优

2.1 硬件选择

Hadoop自身架构的基本特点决定了其硬件配置的选项。Hadoop采用了master和slave架构,其中,master(JobTracker和NameNode)维护了全局元数据信息,重要性远远大于slave(TaskTracker和DataNode)。在较低Hadoop版本中,master均存在单点故障问题,master的配置应远远好于各个slave(TaskTracker或者DataNode)。

 

2.2 操作系统参数调优

由于Hadoop自身的特点,它只适合于将Linux作为操作系统的生产环境。在实际应用场景中,管理员适当对linux内核参数进行优化,可在一定程度上提高作业的运行效率,比较有用的调整选项如下。

1、增大同时打开的文件描述符和网络连接上限。

在Hadoop集群中,由于涉及的作业和任务数目非常多,对于某个节点,由于操作系统内核在文件描述符和网络连接数目等方面的限制,大量的文件读写操作网络连接可能导致作业失败,因此,管理员在启动Hadoop集群时,可以使用ulimit命令将允许同时打开的文件描述符数目上限增大至一个合适的值,同时调整内核参数net.core.somaxconn至一个足够大的值。

此外,Hadoop RPC采用了epoll高并发库,如果你使用的Linux内核版本在2.6.28以上,你需要适当调整epoll的文件描述符上限。

2、关闭swap分区

在Linux中,如果一个进程的内存空间不足,那么,它会将内存中的部分数据暂时写入到磁盘上,当需要时,再将磁盘上的数据动态置换到内存中,通常而言,这种行为会大大降低进程的执行效率。在MapReduce分布式计算环境中,用户完全可以通过控制每个作业处理的数据量和每个任务运行过程中用到的各种缓冲区中,避免使用swap分区。具体方式是调整/etc/sysctl.conf文件中的vm.swappiness。

3、设置合理的预读取缓冲区大小

磁盘I/O性能的发展远远滞后于CPU和内存,因而成为现代计算机系统的一个主要瓶颈。预读取可以有效的减少磁盘的寻道次数和应用程序的I/O等待时间,是改进磁盘读I/O性能的重要优化手段之一。管理员可以使用Linux命令blockdev设置预读取缓冲区的大小,以提高Hadoop中大文件的顺序读的性能。当然,也可以职位Hadoop系统本身增加预读缓冲区大小。

4、文件系统选择与配置

Hadoop的I/O性能很大程度上依赖于Linux本地文件系统的读写性能。Linux中有很多种文件系统可供选择,比如ext3和ext4,不同的文件系统性能能有一定的差别。

5、I/O调度器选择

管理员可以根据自己的应用特点启用最合适的I/O调度器。


2.3 JVM参数调优

由于Hadoop每个服务和任务均会运行在一个的JVM中,因此,JVM的一些重要参数也会影响Hadoop性能。管理员可以通过调整通过JVM FLAGS和JVM垃圾回收机制提高Hadoop性能。


2.4 Hadoop参数调优

 1、合理规划资源

A、合理设置合理的槽位数目

在Hadoop中,计算资源使用槽位(slot)表示的。Slot分为两种:Mapslot和Reduce slot。每种slot代表了一定量的资源,并且同种slot是同质的,也就是说,同种slot代表的资源量是相同的。管理员需要根据实际需要为TaskTracker配置一定数目的Map slot和Reduce slot数目,从而限制每个TaskTracker上并发的Map Task和Reduce Task数目。

槽位数目是在各个TaskTracker上的mapred-site.xml配置的,具体值为:mapreduce.tasktracker.map.tasks.maxium、mapreduce.tasktracker.reduce.tasks.maxium.

B、编写健康监测脚本

Hadoop允许管理员为每个TaskTracker配置一个节点健康状况监测脚本。TaskTracker中包含一个专门的线程周期性执行该脚本,并将脚本执行结果通过心跳机制汇报给JobTracker。一旦JobTracker发现某个TaskTracker的当前状况为不健康(比如内存或者CPU使用率过高),则会将其加入黑名单,从此不再为它分配新的任务(当前正在执行的任务仍会正常执行完毕),直到该脚本执行结果显示为健康。

需要注意的是,该机制只有Hadoop0.20.2以上版本中有。

2、调整心跳配置

A、调整心跳间隔

TaskTracker与JobTracker之间的心跳间隔大小应该适度。

如果太小,JobTracker需要处理高并发的心跳信息,势必造成不小的压力。

如果太大,则空闲的资源不能及时通知JobTracker(进而为之分配新的Task),造成资源空闲,进而降低系统吞吐率。

对于中小规模(300各节点以下)的Hadoop集群,缩短TaskTracker与JobTracker之间的心跳间隔可以明显提高系统吞吐率。

在Hadoop1.0以及更低的版本中,当节点集群规模小于300个节点时,心跳间隔将一直是3秒(不能修改)。这意味着,如果你的集群有10个节点,那么JobTracker平均每秒也只需要处理3.3(10/3)个心跳请求。如果你的集群有100个节点,那么JobTracker平均每秒也只需要处理33(100/3)个心跳请求。

对于一台普通的服务器,这样的负载过低,完全没有充分利用服务器资源。综上所述,对于中小规模的Hadoop集群,3秒的心跳间隔过大,管理员可根据需要适当减少心跳间隔。

具体配置:mapreduce.jobtracker.heartbeat.interval.min、

mapreduce.heartbeats.in.second

mapreduce.jobtracker.heartbeats.scaling.factor.

B、启用带外心跳

通常而言,心跳是由各个TaskTracker以固定时间间隔为周期发送给JobTracker的,心跳包含节点资源使用情况、各任务运行状态等信息。心跳机制是典型的pull-based模型。TaskTracker周期性通过心跳向JobTracker汇报信息,同时获取新分配的任务。这种模型使得任务分配过程存在较大延时:当TaskTracker出现空闲资源,它只能通过下一次心跳。

带外心跳不同于常规心跳,它是任务结束或者任务运行失败时触发的,能够在空闲资源时第一时间通知JobTracker,以便它能够迅速为空闲资源分配新的任务。

具体配置:mapreduce.tasktracker.outofband.heartbeat=true

3、磁盘块配置

Map Task中间结果要写到本地磁盘上,对于I/O密集型的任务来说,这部分数据会对本地磁盘造成很大压力,管理员可通过配置多块磁盘缓解写压力。当存在多块可用磁盘时,Hadoop将采用轮询的方式将不同Map Task的中间结果写到这些磁盘上,从而平摊负载。

4、设置合理的RPC Handler和HTTP线程数目

A、配置RPC Handler数目

JobTracker需要并发处理来自各个TaskTracker的RPC请求,管理员可根据集群规模和服务器并发处理能够调整RPC Handler数目,以使JobTracker服务能力最佳。

具体配置:

mapred.job.tracker.handler.count;

mapreduce.jobtracker.handler.count

B、配置HTTP线程数目

在Shuffle阶段,Reduce Task通过HTTP请求从各个TaskTracker上读取Map Task中间结果,而每个TaskTracker通过TaskTracker通过Jetty Server处理这些HTTP请求。管理员可适当调整Jetty Server的工作线程数以提高Jetty Server的并发处理能力。

具体配置:tasktracker.http.threads

Mapreduce.tasktracker.http.threads.

5、慎用黑名单机制

当一个作业运行结束时,它会统计在各个TaskTracker上失败的任务数目。

如果一个TaskTracker失败的任务数目超过一定值,则作业会将它加到自己的黑名单中。

如果一个TaskTracker被一定数目的作业加入黑名单,则JobTracker会将该TaskTracker加入系统黑名单,此后JobTracker不再为其分配新的任务,直到一定时间段内没出现失败任务。

当Hadoop集群规模较小时,如果一定数量的节点被频繁加入系统黑名单中,则会大大降低集群吞吐率和计算能力,因此建议关闭该功能。

6、启用批量任务调度

在Hadoop中,调度器是最核心的组件之一,它负责将系统中空闲的资源分配给各个任务。当前Hadoop提供了多种调度器,包括默认的FIFO调度器、Fair Scheduler、Capacity Scheduler等,调度器的调度效率直接决定了系统的吞吐率高低。通常而言,为了将空闲资源尽可能分配任务,Hadoop调度器均支持批量任务调度。即一次将所有空闲任务分配下去,而不是一次只分配一个。

7、选择合适的压缩算法

Hadoop通常用于处理I/ O密集型应用。对于这样的应用,Map Task会输出大量中间数据,这些数据的读写对用户是透明,如果能够支持中间数据压缩存储,则会明显提升系统的I/O性能。

当选择压缩算法时,需要考虑压缩比和压缩效率两个因素。

有的压缩算法有很好的压缩比,但是解压缩率很低。反之,有一些算法的压缩/解压缩效率很高,到那时压缩比很低。因此,一个优秀的压缩算法需要平衡压缩比和压缩效率两个因素。

具体配置:

mapred.compress.map.out 表示是否要压缩MapTask中间输出结果

mapred.map.output.compression.codec 表示采用的编码/×××

8、启用预读取机制

前面提到,预读取机制可以有效提高磁盘的I/O读性能。由于Hadoop是典型的顺序读系统,采用预读取机制可明显提高HDFS读性能和MapReduce作业执行效率,管理员可为MapReduce的数据拷贝和IFile文件读取启用预读取功能。

具体表

Hadoop版本号

配置参数

含义

默认值

Apache与CDH3以下

暂时未引入该机制

CDH3以上

mapred.tasktracker.shuffle.fadvise

是否启用Shuffle预读取机制

true

Mapred.tasktracker.shuffle.readhead.bytes

Shuffle预读取缓冲区大小

4MB

Mapred.ifile.readhead

是否启用Ifile预读取机制

true

Mapred.ifile.readhead.bytes

Ifile预读取缓冲区大小

4MB

 

3 从用户角度进行调优

3.1 应用程序编写规范

从用户角度来看,除作业配置参数以外,应用程序本身的编写方式对性能影响也是非常大的,在编写应用程序的过程中,谨记以下几条规则对提高作业性能是十分有帮助的。

1、设置Combiner

对于一大批MapReduce应用程序,如果可以设置一个Combiner,那么对于提高作业性能十分有帮助。Combiner可减少Map Task中间输出结果,从而减少各个Reduce Task的远程拷贝数据量,最终表现为Map Task和Reduce Task执行时间缩短。

2、选择合理的Writable类型

在MR模型中,Map Task和Reduce Task的输入和输出数据类型为Writable类型。Hadoop本身已经提供了很多Writable实现,包括基础类型和对象类型。为应用程序处理的数据类型选择合适的Writable类型可大大提升性能。

比如,处理整型数据时,直接采用IntWritable比先以Text类型读取再转换成整型要高效。

如果输出的整型大部分可用一个或者两个字节保存,那么可以直接采用VIntWritable或者VlongWritable。它们采用了变长整型编码方式,可大大减少输出数据量。


3.2 作业级别参数调优

1、规划合理的任务数目

一个作业的任务数目对作业运行时间有重要的影响。如果一个作业的任务数目过多(这意味着每个任务处理数据很少,执行时间很短),则任务启动时间所占比例将会大大增加;反之一个作业的任务数目过少(这意味着每个任务处理数据很多,执行时间很长),则可能产生过多的溢写数据影响任务执行性能,且任务失败后重新计算代价过大。

在Hadoop中,每个Map Task处理一个Input Split。Input Split的划分方式是由用户自定义的InputFormat决定的,默认情况下,由以下三个配置参数决定:

mapred.min.split.size:Input Split的最小值(在mapred-site.xml配置)

mapred.max.split.size:Input Split的最大值(在mapred-site.xml配置)

dfs.block.size:HDFS中一个block大小

对于Reduce Task而言,每个作业的Reduce Task数目通常由用户决定。用户可以根据估量的Map Task输出数据量设置Reduce Task数目,以防止每个Reduce Task的数据量过大造成大量写磁盘操作。

2、增加文件副本数

如果一个作业并执行的任务数量非常多,那么这些任务共同的输入文件可能成为瓶颈。为防止多个任务并行读取一个文件内容造成瓶颈,用户可以根据需要增加输入文件的副本数量。

用户可以在客户端配置文件hdfs-site.xml中增dfs.replication选项修改文件副本数。
   3、启用推测执行机制

推测执行是Hadoop对拖后腿任务的一种优化机制。当一个作业的某些任务运行速度明显慢于同作业的其他任务时,Hadoop会在另一个节点上为慢任务启动一个备份任务,这样,两个任务同时处理一份数据,而Hadoop最终会将优先完成的那个任务的结果作为最终结果,并将另外一个任务杀掉。

4、设置失败容忍度

Hadoop允许设置作业级别和任务级别的失败容忍度。

作业级别的失败容忍是指Hadoop允许每个作业有一定比例的任务运行失败,这部分任务对应的输入数据将被忽略;

任务级别的失败容忍是指Hadoop允许任务运行失败后再次在另外节点上尝试运行,如果一个任务经过若干次尝试运行后仍然运行失败,那么Hadoop才会最终认为该任务运行失败。

5、适当打开JVM重用功能

为了实现任务隔离,Hadoop将每个任务放到一个单独的JVM执行,而对于执行时间较短的任务,JVM启动和关闭将占用很大比例的时间,为此,用户可启用JVM重用功能,这样,一个JVM可以连续启动多个同类型任务。

6、设置任务超时时间

在一些特殊情况下,一个任务可能因为某种原因阻塞了,这回拖慢整个作业的执行进度,甚至可能导致作业无法作业运行结束。针对这种情况,Hadoop增加了任务超时机制。如果一个任务在一定时间间隔内没有汇报进度,则TaskTracker会主动将其杀死,从而在另外一个节点上重新启动执行。

7、合理使用DistributedCache

当用户的应用程序需要一个外部文件(比如数据字典、配置文件等)时,通常需要使用DistributedCache将文件分发到各个节点上。一般情况下,得到外部文件有两种方法:一种是外部文件与应用程序jar包一起放到客户端,当提交作业由客户端上传到HDFS的一个目录下,然后通过DistributedCache分发到各个节点上。另一种方法是事先将外部文件直接放到HDFS上。从效率上讲,第二种方法比第一种更高效。第二中方式不仅节省了客户端上传文件的时间,还隐含着告诉DistributedCache,请将文件下载到各节点的public级别共享目录上,这样后续作业可重用已经下载好的文件,不必重复下载,即一次下载,终身受益。

8、合理控制Reduce Task的启动时机

在MR计算模型中,由于Reduce Task依赖于Map Task的执行结果,因此,从运算逻辑上讲,Reduce Task应晚于Map Task启动。

在Hadoop中,合理控制Reduce Task启动时机不仅可以加快作业运行速度,而且可提高系统资源利用率。如果ReduceTask启动过早,则可能由于Reduce Task长时间占用Resourceslot造成slot Hoarding现象,从而降低资源利用率;反之,如果Reduce Task启动过晚,则会导致Reduce Task获取资源延迟,增加了作业运行时间。

9、跳过坏记录

Hadoop是用于处理海量数据的,对于大部分数据密集型应用而言,丢弃一条或者几条数据对最终结果的影响不大,正因为如此,Hadoop为用户提供了跳过坏记录的功能。当一条或者几条数据导致任务运行失败时,Hadoop可自动识别并跳过这些坏记录。

10、提高作业优先级

所有Hadoop作业调度器进行任务调度时均会考虑作业优先级这一因素。一个作业的优先级越高,它能够获取的资源(slot)也越多。需要注意的是,通常而言,在生产环境中,管理员已经按照作业重要程度对作业进行了分级,不同重要程度的作业允许配置的优先级不同,用户不可以擅自进行调整。

Hadoop提供了5种作业优先级,分别是VERY_HIGH、HIGH、NORMAL、LOW和VERY_LOW。

3.3 任务级别参数调优

1、Map Task调优

Map Task的输出结果将被暂时存放到一个环形缓冲区中,这个缓冲区的大小由参数“io.sort.mb”指定,默认是100MB,该缓冲区主要由两部分组成:索引和实际数据。

默认情况下,索引占整个buffer的比例为io.sort.record.percent,5%,剩下的空间全部存放数据,当且仅当满足以下任意一个条件时,才会触发一次flush,生成一个临时文件:

A、索引空间使用率比例(io.sort.spill.percent)为0.8;

B、数据空间使用率比例(io.sort.spill.percent)为0.8;

合理调整io.sort.spill.percent值,可减少中间文件数目,提高任务执行效率,举例说明,如果你的key/value非常小,则可以适当调整io.sort.spill.percent值,以防止索引空间优先达到使用上限触发flush。考虑到每条数据记录需要占用索引大小为16B,因此,建议io.sort.spill.percent=16/(16+R),其中R为平均每条记录的长度。

2、Reduce Task调优

Reduce Task会启动多个拷贝线程从每个Map Task上读取相应的中间结果,具体的线程数目由《mapred.reduce.parallel.copies》(默认为5)指定。

对于每个待拷贝的文件,如果文件大小小于一定阀值A,则将其放到内存中,否则以文件的形式存放到磁盘上,

如果内存文件满足一定条件D,则会将这些数据写入磁盘,而当磁盘上文件数目达到io.sort.factor(默认是10),进行一次合并。

阀值A为:heapsize*{mapred.job.shuffle.input.buffer.percent}*0.25

其中,heapsize是通过参数“mapred.child.java.opts”指定的,默认是200MB;mapred.job.shuffle.input.buffer.percent默认大小是0.7。

条件D为以下两个条件任意中一个:

内存使用率达到mapred.job.shuffle.merge.percent的0.06时;

内存中文件数目超过(mapred.inmem.merge.percent)1000时;

 

 

 

原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/191403.html

(0)
上一篇 2021年11月14日
下一篇 2021年11月14日

相关推荐

发表回复

登录后才能评论