前言
在Hadoop集群中,按照集群规模来划分,规模可大可小,大的例如百度,据说有4000台规模大小的Hadoop集群,小的话,几十台机器组成的集群也都是存在的。但是不论说是大型的集群以及小规模的集群,都免不了出现节点故障的情况,尤其是超大型的集群,节点故障几乎天天发生,因此如何做到正确,稳妥的故障情况处理,就显得很重要了,这里提供一个在Hadoop集群中可以想到的办法,就是Decommission操作,节点下线操作,一般的情况是故障节点已经是一个dead节点,或是出现异常情况的节点。此时如若不处理,或许会影响到整个集群的性能。所以在这里分享一下Hadoop中的Decommision机制。
Hadoop节点Decommision操作
在分析相关源码之前,有必要了解一下,让一个数据节点下线的物理操作,操作步骤其实很简单,在以前老版本的Hadoop中,好像是可以通过hadoop dfsadmin带参数的形式执行,但是在最近新版的Hadoop中好像这类命令失效了,于是我在做测试的时候,用了一个更通用的办法来触发这一行为,就是把目标下线节点加入execlude文件中,就是拒绝接入Hadoop集群的节点名单,姑且可以理解为黑名单列表,对应的是include名单,默认这2个名单都没配,所以数据节点一启动,就会注册到namenode节点上。然后是再执行Hadoop 的refreshnode命令,此命令就会从对应的此配置文件中读取最新的数据节点信息,然后开始decommission操作,在50070的ui界面上就可以看到待下线节点的状态会从active状态变为decommision in progress,此时此数据节点的block将会被逐步的拷贝出去,最后随着操作的完成,最终状态就会被变为decommissioned,此时就可以正式下线此节点,用hadoop-demons.sh namenode stop即可。在执行过程的前后,可以执行hadoop fsck的方法观察block块的路径,判断block拷贝情况。
相关涉及类
为什么花了这么多的篇幅介绍,decommision操作呢,因为操作的顺序与实际代码的运行流程基本吻合,有很强的关联性。在下面具体的分析过程中将会逐步体现出来。下面简要列出相关的2个类。
1.DecommissionManager–decommission操作管理类,里面包含了decommission操作状态监控。
2.FSNamesystem–这是一个大的操作类,内部包含了许多模块的工作,包括之前介绍过的副本相关操作也是部分在此类中进行中转,与decommission主要的方法refreshNode()方法包含于此。
Decommision代码跟踪分析
在物理操作中,decommision操作的触发是因为添加了execlude文件,然后再输入refreshNode命令开始的,与此就会对应到了FSNamesystem的同名方法
- /**
- * Rereads the config to get hosts and exclude list file names.
- * Rereads the files to update the hosts and exclude lists. It
- * checks if any of the hosts have changed states:
- * 1. Added to hosts –> no further work needed here.
- * 2. Removed from hosts –> mark AdminState as decommissioned.
- * 3. Added to exclude –> start decommission.
- * 4. Removed from exclude –> stop decommission.
- * 重新从配置中读取节点列表,移除掉准备下线的列表等
- */
- public void refreshNodes(Configuration conf) throws IOException {
- checkSuperuserPrivilege();
- // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
- // Update the file names and refresh internal includes and excludes list
- if (conf == null)
- conf = new Configuration();
- //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
- hostsReader.updateFileNames(conf.get(“dfs.hosts”,“”),
- conf.get(“dfs.hosts.exclude”, “”));
- hostsReader.refresh();
- ….
果然在这里会重新去读exclude,include文件数据,然后这里会遍历当前的数据节点,与配置中新增的节点进行匹配
- synchronized (this) {
- //遍历数据节点
- for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- // Check if not include.
- //判断数据节点是否在允许的主机列表内
- if (!inHostsList(node, null)) {
- //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
- node.setDecommissioned(); // case 2.
- } else {
inHost系列的判断方法如下,inExclude与此方法同,不附上代码
- /**
- * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
- * 如何判断节点是否包含在允许接入列表中的判断方法,exclude列表是同样的道理
- */
- private boolean inHostsList(DatanodeID node, String ipAddr) {
- //从hostReader中读取最新的host列表
- Set<String> hostsList = hostsReader.getHosts();
- //利用主机名去判断
- return (hostsList.isEmpty() ||
- (ipAddr != null && hostsList.contains(ipAddr)) ||
- hostsList.contains(node.getHost()) ||
- hostsList.contains(node.getName()) ||
- ((node instanceof DatanodeInfo) &&
- hostsList.contains(((DatanodeInfo)node).getHostName())));
- }
判断完毕之后,会进行逻辑判断,如果节点在exclude名单中,代表准备下线,则修改其状态,如果是正在下线的节点,则无须操作。完整逻辑如下
- /**
- * Rereads the config to get hosts and exclude list file names.
- * Rereads the files to update the hosts and exclude lists. It
- * checks if any of the hosts have changed states:
- * 1. Added to hosts –> no further work needed here.
- * 2. Removed from hosts –> mark AdminState as decommissioned.
- * 3. Added to exclude –> start decommission.
- * 4. Removed from exclude –> stop decommission.
- * 重新从配置中读取节点列表,移除掉准备下线的列表等
- */
- public void refreshNodes(Configuration conf) throws IOException {
- checkSuperuserPrivilege();
- // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
- // Update the file names and refresh internal includes and excludes list
- if (conf == null)
- conf = new Configuration();
- //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
- hostsReader.updateFileNames(conf.get(“dfs.hosts”,“”),
- conf.get(“dfs.hosts.exclude”, “”));
- hostsReader.refresh();
- synchronized (this) {
- //遍历数据节点
- for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- // Check if not include.
- //判断数据节点是否在允许的主机列表内
- if (!inHostsList(node, null)) {
- //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
- node.setDecommissioned(); // case 2.
- } else {
- //入如果此节点是包含在不允许接入的列表名单中时
- if (inExcludedHostsList(node, null)) {
- //判断此时状态是否为还没开始下线操作,如果是开始decommission
- if (!node.isDecommissionInProgress() &&
- !node.isDecommissioned()) {
- startDecommission(node); // case 3.
- }
- } else {
- //如果是其他的情况,如果节点处于decommsion操作,则停止操作
- if (node.isDecommissionInProgress() ||
- node.isDecommissioned()) {
- stopDecommission(node); // case 4.
- }
- }
- }
- }
- }
- }
然后开始沿着decommision操作继续往里走,就是startDecommision方法
- /**
- * Start decommissioning the specified datanode.
- * 对指定节点开始进行decommission操作
- */
- private void startDecommission (DatanodeDescriptor node)
- throws IOException {
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- LOG.info(“Start Decommissioning node “ + node.getName());
- node.startDecommission();
- //设置节点decommison开始时间
- node.decommissioningStatus.setStartTime(now());
- //
- // all the blocks that reside on this node have to be
- // replicated.
- //检查此时的decommission操作状态
- checkDecommissionStateInternal(node);
- }
- }
在这个方法主要操作还是在于让节点启动相应的decommision下线操作,开始一波副本的拷贝工作了,在这里感觉第一次启动,就在这里判断decommision状态,个人感觉没有必要,一般操作不会很快结束,一般的decomission监控会有额外的线程周期性的监控此类操作。而这个线程进行检查的函数也是checkDecommissionStateInternal方法,他是如何进行检查判断的呢
- /**
- * Change, if appropriate, the admin state of a datanode to
- * decommission completed. Return true if decommission is complete.
- * decommision的状态检测是根据其上的副本量来衡量的
- */
- boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
- //
- // Check to see if all blocks in this decommissioned
- // node has reached their target replication factor.
- //
- if (node.isDecommissionInProgress()) {
- //调用副本进度判断函数
- if (!isReplicationInProgress(node)) {
- node.setDecommissioned();
- LOG.info(“Decommission complete for node “ + node.getName());
- }
- }
- if (node.isDecommissioned()) {
- return true;
- }
- return false;
- }
答案就在上面,通过剩余副本的拷贝情况,如果isReplicationInPogress返回FALSE代表了,已经全部完成拷贝工作了,状态就可以修改为decomissioned结束状态了,下面仔细看看这个isReplicationInProgress方法
- /**
- * Return true if there are any blocks on this node that have not
- * yet reached their replication factor. Otherwise returns false.
- * 如果当前数据节点block块的副本系数还没有满足期望的副本数值值,则表明还要添加复制请求
- */
- private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
- boolean status = false;
- int underReplicatedBlocks = 0;
- int decommissionOnlyReplicas = 0;
- int underReplicatedInOpenFiles = 0;
- //遍历此节点上的所有数据块
- for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
- final Block block = i.next();
- INode fileINode = blocksMap.getINode(block);
- if (fileINode != null) {
- NumberReplicas num = countNodes(block);
- //获取此数据块当前的副本数
- int curReplicas = num.liveReplicas();
- //获取此副本块的期望副本块数
- int curExpectedReplicas = getReplication(block);
- //如果期望副本块数大于当前副本块数,表明block还需要复制
- if (curExpectedReplicas > curReplicas) {
- // Log info about one block for this node which needs replication
- if (!status) {
- //做状态的修改,表明block还需要复制
- status = true;
- logBlockReplicationInfo(block, srcNode, num);
- }
- underReplicatedBlocks++;
- if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
- decommissionOnlyReplicas++;
- }
- if (fileINode.isUnderConstruction()) {
- underReplicatedInOpenFiles++;
- }
- if (!neededReplications.contains(block) &&
- pendingReplications.getNumReplicas(block) == 0) {
- //
- // These blocks have been reported from the datanode
- // after the startDecommission method has been executed. These
- // blocks were in flight when the decommission was started.
- //
- //添加新的副本复制请求
- neededReplications.add(block,
- curReplicas,
- num.decommissionedReplicas(),
- curExpectedReplicas);
- }
- }
- }
- }
- srcNode.decommissioningStatus.set(underReplicatedBlocks,
- decommissionOnlyReplicas, underReplicatedInOpenFiles);
- return status;
- }
原理很简单,对于待撤销数据节点上的每个block块,判断当前副本与期望副本数之间的差,如果不满足,就增强复制请求,至此,decommision这个核心流程就走通了.下面看看一个常驻的监控线程,毕竟他才是主要做监控进度这项任务的.
DecommissionManager
decommisionMannager,decommision操作管理器,所包含的变量很少
- /**
- * Manage node decommissioning.
- * 节点Decommission操作状态管理器
- */
- class DecommissionManager {
- static final Log LOG = LogFactory.getLog(DecommissionManager.class);
- //名字空间系统
- private final FSNamesystem fsnamesystem;
- DecommissionManager(FSNamesystem namesystem) {
- this.fsnamesystem = namesystem;
- }
- /** Periodically check decommission status. */
- //监控方法
- class Monitor implements Runnable {
- …
- }
关键看他内部的核心monitor runnable
- /** Periodically check decommission status. */
- //监控方法
- class Monitor implements Runnable {
- /** recheckInterval is how often namenode checks
- * if a node has finished decommission
- * 定期检查周期
- */
- private final long recheckInterval;
- /** The number of decommission nodes to check for each interval */
- private final int numNodesPerCheck;
- /** firstkey can be initialized to anything. */
- private String firstkey = “”;
- Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
- this.recheckInterval = recheckIntervalInSecond * 1000L;
- this.numNodesPerCheck = numNodesPerCheck;
- }
- /**
- * Check decommission status of numNodesPerCheck nodes
- * for every recheckInterval milliseconds.
- */
- public void run() {
- for(; fsnamesystem.isRunning(); ) {
- synchronized(fsnamesystem) {
- //调用check()方法
- check();
- }
- try {
- Thread.sleep(recheckInterval);
- } catch (InterruptedException ie) {
- LOG.info(“Interrupted “ + this.getClass().getSimpleName(), ie);
- }
- }
- }
for循环内持周期性的续调check()方法,如果系统没有结束的话,docheck方法又会调用之前提到的checkDecommission的方法
- private void check() {
- int count = 0;
- //遍历每个数据节点
- for(Map.Entry<String, DatanodeDescriptor> entry
- : new CyclicIteration<String, DatanodeDescriptor>(
- fsnamesystem.datanodeMap, firstkey)) {
- final DatanodeDescriptor d = entry.getValue();
- firstkey = entry.getKey();
- //如果数据节点正处于decommison操作的话,则做检查
- if (d.isDecommissionInProgress()) {
- try {
- //调用fsnamesystem的checkDecommissionStateInternal方法,此方法内部又会调用isReplicationInProgress进行副本的
- //情况判断
- fsnamesystem.checkDecommissionStateInternal(d);
- } catch(Exception e) {
- LOG.warn(“entry=” + entry, e);
- }
- if (++count == numNodesPerCheck) {
- return;
- }
- }
- }
- }
- }
他是每个数据节点遍历着判断.OK,希望通过我的分析,大家对Decommission有更多的了解,有所收获.
全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9182.html