HDFS源码分析(四)—–节点Decommission机制详解大数据

前言

在Hadoop集群中,按照集群规模来划分,规模可大可小,大的例如百度,据说有4000台规模大小的Hadoop集群,小的话,几十台机器组成的集群也都是存在的。但是不论说是大型的集群以及小规模的集群,都免不了出现节点故障的情况,尤其是超大型的集群,节点故障几乎天天发生,因此如何做到正确,稳妥的故障情况处理,就显得很重要了,这里提供一个在Hadoop集群中可以想到的办法,就是Decommission操作,节点下线操作,一般的情况是故障节点已经是一个dead节点,或是出现异常情况的节点。此时如若不处理,或许会影响到整个集群的性能。所以在这里分享一下Hadoop中的Decommision机制。

Hadoop节点Decommision操作

在分析相关源码之前,有必要了解一下,让一个数据节点下线的物理操作,操作步骤其实很简单,在以前老版本的Hadoop中,好像是可以通过hadoop dfsadmin带参数的形式执行,但是在最近新版的Hadoop中好像这类命令失效了,于是我在做测试的时候,用了一个更通用的办法来触发这一行为,就是把目标下线节点加入execlude文件中,就是拒绝接入Hadoop集群的节点名单,姑且可以理解为黑名单列表,对应的是include名单,默认这2个名单都没配,所以数据节点一启动,就会注册到namenode节点上。然后是再执行Hadoop 的refreshnode命令,此命令就会从对应的此配置文件中读取最新的数据节点信息,然后开始decommission操作,在50070的ui界面上就可以看到待下线节点的状态会从active状态变为decommision in progress,此时此数据节点的block将会被逐步的拷贝出去,最后随着操作的完成,最终状态就会被变为decommissioned,此时就可以正式下线此节点,用hadoop-demons.sh namenode stop即可。在执行过程的前后,可以执行hadoop fsck的方法观察block块的路径,判断block拷贝情况。

相关涉及类

为什么花了这么多的篇幅介绍,decommision操作呢,因为操作的顺序与实际代码的运行流程基本吻合,有很强的关联性。在下面具体的分析过程中将会逐步体现出来。下面简要列出相关的2个类。

1.DecommissionManager–decommission操作管理类,里面包含了decommission操作状态监控。

2.FSNamesystem–这是一个大的操作类,内部包含了许多模块的工作,包括之前介绍过的副本相关操作也是部分在此类中进行中转,与decommission主要的方法refreshNode()方法包含于此。

Decommision代码跟踪分析

在物理操作中,decommision操作的触发是因为添加了execlude文件,然后再输入refreshNode命令开始的,与此就会对应到了FSNamesystem的同名方法

[java] 
view plain
copy
print
?

  1. /** 
  2.    * Rereads the config to get hosts and exclude list file names. 
  3.    * Rereads the files to update the hosts and exclude lists.  It 
  4.    * checks if any of the hosts have changed states: 
  5.    * 1. Added to hosts  –> no further work needed here. 
  6.    * 2. Removed from hosts –> mark AdminState as decommissioned.  
  7.    * 3. Added to exclude –> start decommission. 
  8.    * 4. Removed from exclude –> stop decommission. 
  9.    * 重新从配置中读取节点列表,移除掉准备下线的列表等 
  10.    */  
  11.   public void refreshNodes(Configuration conf) throws IOException {  
  12.     checkSuperuserPrivilege();  
  13.     // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.  
  14.     // Update the file names and refresh internal includes and excludes list  
  15.     if (conf == null)  
  16.       conf = new Configuration();  
  17.     //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性  
  18.     hostsReader.updateFileNames(conf.get(“dfs.hosts”,“”),   
  19.                                 conf.get(“dfs.hosts.exclude”“”));  
  20.     hostsReader.refresh();  
  21. ….  

果然在这里会重新去读exclude,include文件数据,然后这里会遍历当前的数据节点,与配置中新增的节点进行匹配

[java] 
view plain
copy
print
?

  1. synchronized (this) {  
  2.       //遍历数据节点  
  3.       for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();  
  4.            it.hasNext();) {  
  5.         DatanodeDescriptor node = it.next();  
  6.         // Check if not include.  
  7.         //判断数据节点是否在允许的主机列表内  
  8.         if (!inHostsList(node, null)) {  
  9.           //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线  
  10.           node.setDecommissioned();  // case 2.  
  11.         } else {  

inHost系列的判断方法如下,inExclude与此方法同,不附上代码

[java] 
view plain
copy
print
?

  1. /**  
  2.    * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode. 
  3.    * 如何判断节点是否包含在允许接入列表中的判断方法,exclude列表是同样的道理 
  4.    */  
  5.   private boolean inHostsList(DatanodeID node, String ipAddr) {  
  6.     //从hostReader中读取最新的host列表  
  7.     Set<String> hostsList = hostsReader.getHosts();  
  8.     //利用主机名去判断  
  9.     return (hostsList.isEmpty() ||   
  10.             (ipAddr != null && hostsList.contains(ipAddr)) ||  
  11.             hostsList.contains(node.getHost()) ||  
  12.             hostsList.contains(node.getName()) ||   
  13.             ((node instanceof DatanodeInfo) &&   
  14.              hostsList.contains(((DatanodeInfo)node).getHostName())));  
  15.   }  

判断完毕之后,会进行逻辑判断,如果节点在exclude名单中,代表准备下线,则修改其状态,如果是正在下线的节点,则无须操作。完整逻辑如下

[java] 
view plain
copy
print
?

  1. /** 
  2.    * Rereads the config to get hosts and exclude list file names. 
  3.    * Rereads the files to update the hosts and exclude lists.  It 
  4.    * checks if any of the hosts have changed states: 
  5.    * 1. Added to hosts  –> no further work needed here. 
  6.    * 2. Removed from hosts –> mark AdminState as decommissioned.  
  7.    * 3. Added to exclude –> start decommission. 
  8.    * 4. Removed from exclude –> stop decommission. 
  9.    * 重新从配置中读取节点列表,移除掉准备下线的列表等 
  10.    */  
  11.   public void refreshNodes(Configuration conf) throws IOException {  
  12.     checkSuperuserPrivilege();  
  13.     // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.  
  14.     // Update the file names and refresh internal includes and excludes list  
  15.     if (conf == null)  
  16.       conf = new Configuration();  
  17.     //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性  
  18.     hostsReader.updateFileNames(conf.get(“dfs.hosts”,“”),   
  19.                                 conf.get(“dfs.hosts.exclude”“”));  
  20.     hostsReader.refresh();  
  21.     synchronized (this) {  
  22.       //遍历数据节点  
  23.       for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();  
  24.            it.hasNext();) {  
  25.         DatanodeDescriptor node = it.next();  
  26.         // Check if not include.  
  27.         //判断数据节点是否在允许的主机列表内  
  28.         if (!inHostsList(node, null)) {  
  29.           //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线  
  30.           node.setDecommissioned();  // case 2.  
  31.         } else {  
  32.            //入如果此节点是包含在不允许接入的列表名单中时  
  33.           if (inExcludedHostsList(node, null)) {  
  34.             //判断此时状态是否为还没开始下线操作,如果是开始decommission  
  35.             if (!node.isDecommissionInProgress() &&   
  36.                 !node.isDecommissioned()) {  
  37.               startDecommission(node);   // case 3.  
  38.             }  
  39.           } else {  
  40.             //如果是其他的情况,如果节点处于decommsion操作,则停止操作  
  41.             if (node.isDecommissionInProgress() ||   
  42.                 node.isDecommissioned()) {  
  43.               stopDecommission(node);   // case 4.  
  44.             }   
  45.           }  
  46.         }  
  47.       }  
  48.     }   
  49.         
  50.   }  

然后开始沿着decommision操作继续往里走,就是startDecommision方法

[java] 
view plain
copy
print
?

  1. /** 
  2.    * Start decommissioning the specified datanode.  
  3.    * 对指定节点开始进行decommission操作 
  4.    */  
  5.   private void startDecommission (DatanodeDescriptor node)   
  6.     throws IOException {  
  7.   
  8.     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {  
  9.       LOG.info(“Start Decommissioning node “ + node.getName());  
  10.       node.startDecommission();  
  11.       //设置节点decommison开始时间  
  12.       node.decommissioningStatus.setStartTime(now());  
  13.       //  
  14.       // all the blocks that reside on this node have to be   
  15.       // replicated.  
  16.       //检查此时的decommission操作状态  
  17.       checkDecommissionStateInternal(node);  
  18.     }  
  19.   }  

在这个方法主要操作还是在于让节点启动相应的decommision下线操作,开始一波副本的拷贝工作了,在这里感觉第一次启动,就在这里判断decommision状态,个人感觉没有必要,一般操作不会很快结束,一般的decomission监控会有额外的线程周期性的监控此类操作。而这个线程进行检查的函数也是checkDecommissionStateInternal方法,他是如何进行检查判断的呢

[java] 
view plain
copy
print
?

  1. /** 
  2.    * Change, if appropriate, the admin state of a datanode to  
  3.    * decommission completed. Return true if decommission is complete. 
  4.    * decommision的状态检测是根据其上的副本量来衡量的 
  5.    */  
  6.   boolean checkDecommissionStateInternal(DatanodeDescriptor node) {  
  7.     //  
  8.     // Check to see if all blocks in this decommissioned  
  9.     // node has reached their target replication factor.  
  10.     //  
  11.     if (node.isDecommissionInProgress()) {  
  12.       //调用副本进度判断函数  
  13.       if (!isReplicationInProgress(node)) {  
  14.         node.setDecommissioned();  
  15.         LOG.info(“Decommission complete for node “ + node.getName());  
  16.       }  
  17.     }  
  18.     if (node.isDecommissioned()) {  
  19.       return true;  
  20.     }  
  21.     return false;  
  22.   }  

答案就在上面,通过剩余副本的拷贝情况,如果isReplicationInPogress返回FALSE代表了,已经全部完成拷贝工作了,状态就可以修改为decomissioned结束状态了,下面仔细看看这个isReplicationInProgress方法

[java] 
view plain
copy
print
?

  1. /** 
  2.    * Return true if there are any blocks on this node that have not 
  3.    * yet reached their replication factor. Otherwise returns false. 
  4.    * 如果当前数据节点block块的副本系数还没有满足期望的副本数值值,则表明还要添加复制请求 
  5.    */  
  6.   private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {  
  7.     boolean status = false;  
  8.     int underReplicatedBlocks = 0;  
  9.     int decommissionOnlyReplicas = 0;  
  10.     int underReplicatedInOpenFiles = 0;  
  11.       
  12.     //遍历此节点上的所有数据块  
  13.     for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {  
  14.       final Block block = i.next();  
  15.       INode fileINode = blocksMap.getINode(block);  
  16.   
  17.       if (fileINode != null) {  
  18.         NumberReplicas num = countNodes(block);  
  19.         //获取此数据块当前的副本数  
  20.         int curReplicas = num.liveReplicas();  
  21.         //获取此副本块的期望副本块数  
  22.         int curExpectedReplicas = getReplication(block);  
  23.         //如果期望副本块数大于当前副本块数,表明block还需要复制  
  24.         if (curExpectedReplicas > curReplicas) {  
  25.           // Log info about one block for this node which needs replication  
  26.           if (!status) {  
  27.             //做状态的修改,表明block还需要复制  
  28.             status = true;  
  29.             logBlockReplicationInfo(block, srcNode, num);  
  30.           }  
  31.           underReplicatedBlocks++;  
  32.           if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {  
  33.             decommissionOnlyReplicas++;  
  34.           }  
  35.           if (fileINode.isUnderConstruction()) {  
  36.             underReplicatedInOpenFiles++;  
  37.           }  
  38.   
  39.           if (!neededReplications.contains(block) &&  
  40.             pendingReplications.getNumReplicas(block) == 0) {  
  41.             //  
  42.             // These blocks have been reported from the datanode  
  43.             // after the startDecommission method has been executed. These  
  44.             // blocks were in flight when the decommission was started.  
  45.             //  
  46.             //添加新的副本复制请求  
  47.             neededReplications.add(block,   
  48.                                    curReplicas,  
  49.                                    num.decommissionedReplicas(),  
  50.                                    curExpectedReplicas);  
  51.           }  
  52.         }  
  53.       }  
  54.     }  
  55.     srcNode.decommissioningStatus.set(underReplicatedBlocks,  
  56.         decommissionOnlyReplicas, underReplicatedInOpenFiles);  
  57.   
  58.     return status;  
  59.   }  

原理很简单,对于待撤销数据节点上的每个block块,判断当前副本与期望副本数之间的差,如果不满足,就增强复制请求,至此,decommision这个核心流程就走通了.下面看看一个常驻的监控线程,毕竟他才是主要做监控进度这项任务的.

DecommissionManager

decommisionMannager,decommision操作管理器,所包含的变量很少

[java] 
view plain
copy
print
?

  1. /** 
  2.  * Manage node decommissioning. 
  3.  * 节点Decommission操作状态管理器 
  4.  */  
  5. class DecommissionManager {  
  6.   static final Log LOG = LogFactory.getLog(DecommissionManager.class);  
  7.     
  8.   //名字空间系统  
  9.   private final FSNamesystem fsnamesystem;  
  10.   
  11.   DecommissionManager(FSNamesystem namesystem) {  
  12.     this.fsnamesystem = namesystem;  
  13.   }  
  14.   
  15.   /** Periodically check decommission status. */  
  16.   //监控方法  
  17.   class Monitor implements Runnable {  
  18. …  
  19. }  



关键看他内部的核心monitor runnable

[java] 
view plain
copy
print
?

  1. /** Periodically check decommission status. */  
  2.   //监控方法  
  3.   class Monitor implements Runnable {  
  4.     /** recheckInterval is how often namenode checks 
  5.      *  if a node has finished decommission 
  6.      * 定期检查周期 
  7.      */  
  8.     private final long recheckInterval;  
  9.     /** The number of decommission nodes to check for each interval */  
  10.     private final int numNodesPerCheck;  
  11.     /** firstkey can be initialized to anything. */  
  12.     private String firstkey = “”;  
  13.   
  14.     Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {  
  15.       this.recheckInterval = recheckIntervalInSecond * 1000L;  
  16.       this.numNodesPerCheck = numNodesPerCheck;  
  17.     }  
  18.   
  19.     /** 
  20.      * Check decommission status of numNodesPerCheck nodes 
  21.      * for every recheckInterval milliseconds. 
  22.      */  
  23.     public void run() {  
  24.       for(; fsnamesystem.isRunning(); ) {  
  25.         synchronized(fsnamesystem) {  
  26.           //调用check()方法  
  27.           check();  
  28.         }  
  29.     
  30.         try {  
  31.           Thread.sleep(recheckInterval);  
  32.         } catch (InterruptedException ie) {  
  33.           LOG.info(“Interrupted “ + this.getClass().getSimpleName(), ie);  
  34.         }  
  35.       }  
  36.     }  

for循环内持周期性的续调check()方法,如果系统没有结束的话,docheck方法又会调用之前提到的checkDecommission的方法

[java] 
view plain
copy
print
?

  1. private void check() {  
  2.       int count = 0;  
  3.       //遍历每个数据节点  
  4.       for(Map.Entry<String, DatanodeDescriptor> entry  
  5.           : new CyclicIteration<String, DatanodeDescriptor>(  
  6.               fsnamesystem.datanodeMap, firstkey)) {  
  7.         final DatanodeDescriptor d = entry.getValue();  
  8.         firstkey = entry.getKey();  
  9.    
  10.         //如果数据节点正处于decommison操作的话,则做检查  
  11.         if (d.isDecommissionInProgress()) {  
  12.           try {  
  13.             //调用fsnamesystem的checkDecommissionStateInternal方法,此方法内部又会调用isReplicationInProgress进行副本的  
  14.             //情况判断  
  15.             fsnamesystem.checkDecommissionStateInternal(d);  
  16.           } catch(Exception e) {  
  17.             LOG.warn(“entry=” + entry, e);  
  18.           }  
  19.           if (++count == numNodesPerCheck) {  
  20.             return;  
  21.           }  
  22.         }  
  23.       }  
  24.     }  
  25.   }  

他是每个数据节点遍历着判断.OK,希望通过我的分析,大家对Decommission有更多的了解,有所收获.



全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9182.html

(0)
上一篇 2021年7月19日 09:17
下一篇 2021年7月19日 09:17

相关推荐

发表回复

登录后才能评论