这篇文章主要讲解了“Hadoop Job提交相关知识点分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop Job提交相关知识点分析”吧!
Configuration类是用来访问hadoop的配置参数的。
Configuration类首先会通过静态代码段加载hadoop的配置文件core-default.xml和和core-site.xml,相关代码如下:
<span >static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); } </span>
defaultResources是一个ArrayList,用来保存默认的配置文件路径。如果一个默认的配置文件路径不在defaultResource里面,就添加进去,这个逻辑是在
addDefaultResource方法中实现的。
properties是一个Properties对象,保存从配置文件中解析出来的配置属性,如果多个配置文件有相同的key,后者会覆盖前者的值。
JobConf类用来配置Map/Reduce作业信息的,继承自Configuration类。
JobConf类首先会通过静态代码段加载mapred-default.xml和mapred-site.xml配置属性文件。
DEFAULT_MAPRED_TASK_JAVA_OPTS=“-Xmx200m”,默认情况下Map/Reduce任务的JAVA命令行选项指定的JAVA虚拟机最大内存是200M。
JobClient类是用户与JobTracker交互的主要接口,通过它可以提交jobs,追踪job的进度,访问task组件的日志,查询集群的状态信息等。
提交job是通过runJob方法实现的,相关代码如下:
<span >public static RunningJob runJob(JobConf job) throws IOException { JobClient jc = new JobClient(job); RunningJob rj = jc.submitJob(job); try { if (!jc.monitorAndPrintJob(job, rj)) { LOG.info("Job Failed: " + rj.getFailureInfo()); throw new IOException("Job failed!"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return rj; }</span>
首先创建一个JobClient对象,此对象在构造函数中会根据JobConf对象去连接JobTracker。
JobClient与JobTracker通信是通过jobSubmitClient操作的,jobSubmitClient是JobSubmissionProtocol类型的动态代理类,是通过如下方法产生的:
<span > private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); }</span>
getProxy方法的关键是Invoker类,Invoker类实现了InvocationHandler接口,主要有两个成员变量,remoteId是Client.ConnectionId类型,保存连接地址和用户的ticket,客户端连接服务器由<remoteAddress,protocol,ticket>唯一标识。从这里我们也可以看到一些配置属性值,默认的rpcTimeout是0,
ipc.client.connection.maxidletime客户端连接的最大空闲时间是10s,
ipc.client.connect.max.retries客户端同服务器建立连接时的最大重试次数是10,
ipc.client.tcpnodelay是否开启Nagle算法(对TCP/IP进行拥塞控制),如果开启,会减少延迟,但是会增加小数据报,默认是false。client是Client类,用于IPC通信。client会通过ClientCache类来缓存,如果缓存中没有,会新建一个Client,否则原client计数加1。Invoker类主要的方法是invoke方法,invoke方法的功能是调用client的方法然后返回结果。动态代理类代理的对象是Client对象。
submitJobInternal方法是真正用来提交job的,具体步骤如下:
1、初始化staging目录,staging目录根目录是由mapreduce.jobtracker.staging.root.dir配置的,默认是/tmp/hadoop/mapred/staging,具体到某个用户的staging目录是$ROOT/userName/.staging。
2、从JobTracker那里取得新的job id,job id从1开始递增。
3、获得提交job的目录submitJobDir=用户的staging目录/jobid,并且将这个目录设置成mapreduce.job.dir的值。
4、copyAndConfigureFiles拷贝和初始化文件,首先从配置属性mapred.submit.replication取得replication值,默认为10。然后判断submitJobDir目录是否存在,如果存在拋异常;否则创建submitJobDir目录;取得job的分布式缓存文件路径=submitJobDir/files;取得job的分布式缓存存档路径=submitJobDir/archives;取得job的分布式缓存libjars路径=submitJobDir/libjars;如果命令行参数有tmpfiles,则将这些文件拷贝到分布式缓存文件路径下,同时将这个路径加入到分布式缓存中;如果命令行参数有tmpjars,则将这些文件拷贝到分布式缓存libjars路径下,同时将这个路径加入到分布式缓存中;如果命令行参数有tmparchives,则将这些文件拷贝到分布式缓存存档路径下,同时将这个路径加入到分布式缓存中;根据mapred.jar属性取得jar包的路径,如果没有指定job的名字,那么将使用jar包的名字作为job名字;取得job jar的存储路径=submitJobDir/job.jar;将用户指定的jar包拷贝到job jar的存储路径;设置工作目录,默认是配置属性mapred.working.dir指定的值。
5、取得job配置文件的路径submitJobFile=submitJobDir/job.xml;设置
mapreduce.job.submithostaddress为本机ip地址,设置
mapreduce.job.submithost为本机主机名。
6、为job创建输入分区,这是由writeSplits方法完成的。以old api为例,首先调用InputFormat的getSplits方法得到一个InputSplit分区数组,FileInputFormat类的getSplits方法实现过程如下:
通过listStatus方法取得输入文件路径列表,过滤掉_和.开头的路径以及根据设置的mapred.input.pathFilter.class过滤;
在JobConf中设置mapreduce.input.num.files为输入文件数;
计算出所有输入文件的总大小totalSize,目标分区大小goalSize=totalSize/numSplits(由mapred.map.tasks配置,默认为1),最小分区大小minSize=mapred.min.split.size配置和1之间的较大值,对于每一个输入文件,如果这个文件的长度不等于0并且是可切分的,计算分区大小splitSize=Math.max(minSize,Math.min(goalSize,blockSize)),blockSize为HDFS存储文件的块大小,对于每一个分区大小,计算对其贡献最大的主机数组(根据机架以及块的字节大小确定),然后将这个分区加入到分区列表;然后根据分区大长度从大到小对分区列表进行排序;然后将分区列表写入到分区文件,分区文件名=submitJobDir/job.split,分区文件的存储格式:SPL字节信息,分区版本号,{InputSplit类名,InputSplit类信息}+;SplitMetaInfo数组记录每个分区信息在文件中的偏移,主机信息和长度;将分区Meta信息SplitMetaInfo数组写入到文件submitJobDir/job.splitmetainfo。
7、JobConf设置mapred.map.tasks为分区数。
8、根据mapred.job.queue.name获得job提交的队列的名字,默认是default,然后根据这个队列名字获得访问控制列表。
9、将重新配置过的JobConf写入到submitJobDir/job.xml文件。
10、将jobid,submitJobDir信息传给JobTracker正式提交job,并通过NetworkedJob对象跟踪job的状态。
monitorAndPrintJob方法监控job的运行并且实时打印job的状态。
感谢各位的阅读,以上就是“Hadoop Job提交相关知识点分析”的内容了,经过本文的学习后,相信大家对Hadoop Job提交相关知识点分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/230302.html