YARN源码分析(三)—–ResourceManager HA之应用状态存储与恢复详解大数据

前言

任何系统即使做的再大,都会有可能出现各种各样的突发状况。尽管你可以说我在软件层面上已经做到所有情况的意外处理了,但是万一硬件出问题了或者说物理层面上出了问题,恐怕就不是多写几行代码能够立刻解决的吧,说了这么多,无非就是想强调HA,系统高可用性的重要性。在YARN中,NameNode的HA方式估计很多人都已经了解了,那本篇文章就来为大家梳理梳理RM资源管理器HA方面的知识,并不是指简单的RM的HA配置,确切的说是RM的应用状态存储于恢复。

RM应用状态存储使用

RM应用状态存储是什么意思呢,我们知道,RM全称ResourceManager,好比一个大管家,他不仅要与各个节点上的ApplicationMaster进行通信,还要与NodeManager进行心跳包的传输,自然在RM上会注册进来很多的应用,每个应用由1个ApplicationMaster负责掌管整个应用周期。既然RM角色这么重要,就有必要保存一下RM的信息状态,以免RM进程异常退出导致的应用状态信息丢失,RM重启无法重跑之前的应用的现象。

保存什么应用信息

既然目标已经明确了,那么在YARN中,保存的应用信息到底是哪些数据信息呢,应用状态信息只是1个笼统的概念。下面用一张图来表示。

YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复详解大数据

可以看到,这是一张分层多叉树的形状,这个图类似于MapReduce作用运行的分层执行状态图,做个简单介绍,最上面就是1个RMState的状态,这个状态中包含若干个ApplicationState的应用状态信息,每个应用状态信息中包含了很多歌应用尝试信息状态。

应用状态信息如何保存

RM应用状态信息保存的方式又哪些呢:

1.MemoryRMStateStore–信息状态保存在内存中的实现类。

2.FileSystemRMStateStore–信息状态保存在HDFS文件系统中,这个是做了持久化了。

3.NullRMStateStore–do nothing,什么都不做,就是不保存应用状态信息。

4.ZKRMStateStore–信息状态保存在Zookeeper中。

由于我分析的源码中还没有ZKRMStateStore这个类,所以只针对前3种做一个简单的介绍。上面列举的几个类都是具体实现类,那么就一定存在更加上层级的类来定义更基本的变量和方法,答案是RMStateStore类,所以继承关系就是下面这张图所表示

YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复详解大数据

下面蓝色箭头所表示的意思实现类的依托对象。具体什么意思,看接下来的源码分析。首先RMStateStore类对象

  1. /** 
  2.  * Base class to implement storage of ResourceManager state. 
  3.  * Takes care of asynchronous notifications and interfacing with YARN objects. 
  4.  * Real store implementations need to derive from it and implement blocking 
  5.  * store and load methods to actually store and load the state. 
  6.  * 保存RM资源状态信息的基类,也是一个服务对象类 
  7.  */  
  8. public abstract class RMStateStore extends AbstractService {  
  9.    
  10.   ….  
  11.   
  12.   
  13.   /** 
  14.    * State of an application attempt 
  15.    * 一次应用尝试状态信息类 
  16.    */  
  17.   public static class ApplicationAttemptState {  
  18.     //应用尝试ID  
  19.     final ApplicationAttemptId attemptId;  
  20.     //主容器  
  21.     final Container masterContainer;  
  22.     //凭证信息  
  23.     final Credentials appAttemptCredentials;  
  24.   
  25.     ….  
  26.   }  
  27.     
  28.   /** 
  29.    * State of an application application 
  30.    * 应用状态信息类 
  31.    */  
  32.   public static class ApplicationState {  
  33.     //应用提交上下文对象  
  34.     final ApplicationSubmissionContext context;  
  35.     //应用提交时间  
  36.     final long submitTime;  
  37.     //提交者  
  38.     final String user;  
  39.     //应用尝试信息对  
  40.     Map<ApplicationAttemptId, ApplicationAttemptState> attempts =  
  41.                   new HashMap<ApplicationAttemptId, ApplicationAttemptState>();  
  42.       
  43.     ….  
  44.   }  
  45.   
  46.   public static class RMDTSecretManagerState {  
  47.     // DTIdentifier -> renewDate  
  48.     //RM身份标识符ID对时间的映射  
  49.     Map<RMDelegationTokenIdentifier, Long> delegationTokenState =  
  50.         new HashMap<RMDelegationTokenIdentifier, Long>();  
  51.   
  52.     Set<DelegationKey> masterKeyState =  
  53.         new HashSet<DelegationKey>();  
  54.   
  55.     int dtSequenceNumber = 0;  
  56.   
  57.     ….  
  58.   }  
  59.   
  60.   /** 
  61.    * State of the ResourceManager 
  62.    * RM状态信息类 
  63.    */  
  64.   public static class RMState {  
  65.     //RM中的应用状态对图  
  66.     Map<ApplicationId, ApplicationState> appState =  
  67.         new HashMap<ApplicationId, ApplicationState>();  
  68.   
  69.     RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();  
  70.   
  71.     ….  
  72.   }  

重点关注,在此类中定义的几个应用状态类,与上面第一张进行对比。下面看下在这个父类中定义的几个应用保存相关的方法:

  1. /** 
  2.    * Non-Blocking API 
  3.    * ResourceManager services use this to store the application’s state 
  4.    * This does not block the dispatcher threads 
  5.    * RMAppStoredEvent will be sent on completion to notify the RMApp 
  6.    * 保存应用状态方法,触发一次保存event事件,此方法为非阻塞方法 
  7.    */  
  8.   @SuppressWarnings(“unchecked”)  
  9.   public synchronized void storeApplication(RMApp app) {  
  10.     ApplicationSubmissionContext context = app  
  11.                                             .getApplicationSubmissionContext();  
  12.     assert context instanceof ApplicationSubmissionContextPBImpl;  
  13.     ApplicationState appState = new ApplicationState(  
  14.         app.getSubmitTime(), context, app.getUser());  
  15.     //触发一次应用信息保存事件,由中央调度器进行事件分发处理  
  16.     dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));  
  17.   }  
  18.       
  19.   /** 
  20.    * Blocking API 
  21.    * Derived classes must implement this method to store the state of an  
  22.    * application. 
  23.    * 保存应用状态信息的阻塞方法,由子类具体实现 
  24.    */  
  25.   protected abstract void storeApplicationState(String appId,  
  26.                                       ApplicationStateDataPBImpl appStateData)   
  27.                                       throws Exception;  

保存应用状态方法分为阻塞式的方法和非阻塞式的方法,非阻塞式的方法时通过事件驱动的方式实现,阻塞式的方法由具体子类去实现。移除应用的方法有小小的不同点

  1. /** 
  2.    * Non-blocking API 
  3.    * ResourceManager services call this to remove an application from the state 
  4.    * store 
  5.    * This does not block the dispatcher threads 
  6.    * There is no notification of completion for this operation. 
  7.    * There is no notification of completion for this operation. 
  8.    * RM中移除应用状态信息,主要是移除里面的应用尝试信息列表 
  9.    */  
  10.   public synchronized void removeApplication(RMApp app) {  
  11.     ApplicationState appState = new ApplicationState(  
  12.             app.getSubmitTime(), app.getApplicationSubmissionContext(),  
  13.             app.getUser());  
  14.     //取出此应用中的运行尝试信息状态  
  15.     for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {  
  16.       Credentials credentials = getCredentialsFromAppAttempt(appAttempt);  
  17.       ApplicationAttemptState attemptState =  
  18.           new ApplicationAttemptState(appAttempt.getAppAttemptId(),  
  19.             appAttempt.getMasterContainer(), credentials);  
  20.       appState.attempts.put(attemptState.getAttemptId(), attemptState);  
  21.     }  
  22.     //进行移除操作  
  23.     removeApplication(appState);  
  24.   }  

移除应用需要把目标应用内包含的所有应用尝试信息都取出,然后进行移除操作,removeApplication操作又会又如上面的2个方法分支

  1. /** 
  2.    * Non-Blocking API 
  3.    */  
  4.   public synchronized void removeApplication(ApplicationState appState) {  
  5.     dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));  
  6.   }  
  7.   
  8.   /** 
  9.    * Blocking API 
  10.    * Derived classes must implement this method to remove the state of an  
  11.    * application and its attempts 
  12.    */  
  13.   protected abstract void removeApplicationState(ApplicationState appState)   
  14.                                                              throws Exception;  

在这个类中特别介绍一下,下面这个类是干嘛的

  1. public static class RMDTSecretManagerState {  
  2.     // DTIdentifier -> renewDate  
  3.     //RM身份标识符ID对时间的映射  
  4.     Map<RMDelegationTokenIdentifier, Long> delegationTokenState =  
  5.         new HashMap<RMDelegationTokenIdentifier, Long>();  
  6.   
  7.     Set<DelegationKey> masterKeyState =  
  8.         new HashSet<DelegationKey>();  
  9.   
  10.     int dtSequenceNumber = 0;  
  11.   
  12.     ….  
  13.   }  

里面保存了RM身份标识位到时间的映射,RMDelegationonTokenIdentifier标识位可以用来表明此RM是旧的RM还是新启动的RM,对于应用来说。下面说说3个具体实现类。

MemoryRMStateStore

内存保存实现类,RM的应用状态信息在RMStateStore已经被抽象成了RMState类,所以在MemoryRMStateStore类中,肯定会有对应的变量

  1. //内存RM状态信息保存类实现  
  2. public class MemoryRMStateStore extends RMStateStore {  
  3.     
  4.   RMState state = new RMState();  
  5.     
  6.   @VisibleForTesting  
  7.   public RMState getState() {  
  8.     return state;  
  9.   }  
  10.   …  

刚刚开始的时候,state是一个没有任何信息内容的实例对象。然后他定义了保存应用信息对象的方法

  1. @Override  
  2.   public void storeApplicationState(String appId,   
  3.                                      ApplicationStateDataPBImpl appStateData)  
  4.       throws Exception {  
  5.     //生成新的应用状态对象实例  
  6.     ApplicationState appState = new ApplicationState(  
  7.         appStateData.getSubmitTime(),  
  8.         appStateData.getApplicationSubmissionContext(), appStateData.getUser());  
  9.     if (state.appState.containsKey(appState.getAppId())) {  
  10.       Exception e = new IOException(“App: “ + appId + ” is already stored.”);  
  11.       LOG.info(“Error storing info for app: “ + appId, e);  
  12.       throw e;  
  13.     }  
  14.     //加入state对象中  
  15.     state.appState.put(appState.getAppId(), appState);  
  16.   }  

保存应用尝试状态信息方法

  1. @Override  
  2.   public synchronized void storeApplicationAttemptState(String attemptIdStr,   
  3.                             ApplicationAttemptStateDataPBImpl attemptStateData)  
  4.                             throws Exception {  
  5.     ApplicationAttemptId attemptId = ConverterUtils  
  6.                                         .toApplicationAttemptId(attemptIdStr);  
  7.     …  
  8.     ApplicationAttemptState attemptState =  
  9.         new ApplicationAttemptState(attemptId,  
  10.           attemptStateData.getMasterContainer(), credentials);  
  11.   
  12.     ApplicationState appState = state.getApplicationState().get(  
  13.         attemptState.getAttemptId().getApplicationId());  
  14.     if (appState == null) {  
  15.       throw new YarnRuntimeException(“Application doesn’t exist”);  
  16.     }  
  17.   
  18.     if (appState.attempts.containsKey(attemptState.getAttemptId())) {  
  19.       Exception e = new IOException(“Attempt: “ +  
  20.           attemptState.getAttemptId() + ” is already stored.”);  
  21.       LOG.info(“Error storing info for attempt: “ +  
  22.           attemptState.getAttemptId(), e);  
  23.       throw e;  
  24.     }  
  25.     //加入appState的运行尝试信息状态列表中  
  26.     appState.attempts.put(attemptState.getAttemptId(), attemptState);  
  27.   }  

应用状态信息保存完毕之后,如何从内存中进行加载呢,这个也是我们所关心的,;loadState()方法实现了这个需求

  1. //相当于返回一个内存中维护的RM状态拷贝对象  
  2.   @Override  
  3.   public synchronized RMState loadState() throws Exception {  
  4.     // return a copy of the state to allow for modification of the real state  
  5.     //新建一个RMState对象,拷贝内存中维护的RMstate对象  
  6.     RMState returnState = new RMState();  
  7.     //拷贝appState  
  8.     returnState.appState.putAll(state.appState);  
  9.     returnState.rmSecretManagerState.getMasterKeyState()  
  10.       .addAll(state.rmSecretManagerState.getMasterKeyState());  
  11.     returnState.rmSecretManagerState.getTokenState().putAll(  
  12.       state.rmSecretManagerState.getTokenState());  
  13.     returnState.rmSecretManagerState.dtSequenceNumber =  
  14.         state.rmSecretManagerState.dtSequenceNumber;  
  15.     return returnState;  
  16.   }  

相当于MemoryStateStore对象的RMState深拷贝。

FileSystemRMStateStore

文件系统RM应用信息状态保存类,此类做的一个核心操作就是把应用状态信息持久化到HDFS中了。

  1. /** 
  2.  * A simple class for storing RM state in any storage that implements a basic 
  3.  * FileSystem interface. Does not use directories so that simple key-value 
  4.  * stores can be used. The retry policy for the real filesystem client must be 
  5.  * configured separately to enable retry of filesystem operations when needed. 
  6.  * RM状态信息文件系统保存类 
  7.  */  
  8. public class FileSystemRMStateStore extends RMStateStore {  
  9.   
  10.   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);  
  11.   
  12.   private static final String ROOT_DIR_NAME = “FSRMStateRoot”;  
  13.   private static final String RM_DT_SECRET_MANAGER_ROOT = “RMDTSecretManagerRoot”;  
  14.   private static final String RM_APP_ROOT = “RMAppRoot”;  
  15.   private static final String DELEGATION_KEY_PREFIX = “DelegationKey_”;  
  16.   private static final String DELEGATION_TOKEN_PREFIX = “RMDelegationToken_”;  
  17.   private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =  
  18.       “RMDTSequenceNumber_”;  
  19.   //文件系统对象  
  20.   protected FileSystem fs;  
  21.     
  22.   //RM保存的文件路径  
  23.   private Path rootDirPath;  
  24.   private Path rmDTSecretManagerRoot;  
  25.   private Path rmAppRoot;  
  26.   private Path dtSequenceNumberPath = null;  
  27.   
  28.   @VisibleForTesting  
  29.   Path fsWorkingPath;  

声明了多种路径,不同对象实例有不同的路径,然后还有1个总文件系统操作对象。下面看核心的保存应用方法

  1. @Override  
  2.   public synchronized void storeApplicationState(String appId,  
  3.       ApplicationStateDataPBImpl appStateDataPB) throws Exception {  
  4.     Path appDirPath = getAppDir(rmAppRoot, appId);  
  5.     fs.mkdirs(appDirPath);  
  6.     //获取待写入的目录路径  
  7.     Path nodeCreatePath = getNodePath(appDirPath, appId);  
  8.   
  9.     LOG.info(“Storing info for app: “ + appId + ” at: “ + nodeCreatePath);  
  10.     //获取待写入的状态数据  
  11.     byte[] appStateData = appStateDataPB.getProto().toByteArray();  
  12.     try {  
  13.       // currently throw all exceptions. May need to respond differently for HA  
  14.       // based on whether we have lost the right to write to FS  
  15.       //进行状态信息的写入  
  16.       writeFile(nodeCreatePath, appStateData);  
  17.     } catch (Exception e) {  
  18.       LOG.info(“Error storing info for app: “ + appId, e);  
  19.       throw e;  
  20.     }  
  21.   }  

对应的加载RM应用状态方法

  1. @Override  
  2.   public synchronized RMState loadState() throws Exception {  
  3.     //新建RM状态对象  
  4.     RMState rmState = new RMState();  
  5.     //调用方法,从文件中进行恢复  
  6.     // recover DelegationTokenSecretManager  
  7.     loadRMDTSecretManagerState(rmState);  
  8.     // recover RM applications  
  9.     loadRMAppState(rmState);  
  10.     return rmState;  
  11.   }  

加载应用操作

  1. private void loadRMAppState(RMState rmState) throws Exception {  
  2.     try {  
  3.       List<ApplicationAttemptState> attempts =  
  4.           new ArrayList<ApplicationAttemptState>();  
  5.   
  6.       for (FileStatus appDir : fs.listStatus(rmAppRoot)) {  
  7.         for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {  
  8.           assert childNodeStatus.isFile();  
  9.           String childNodeName = childNodeStatus.getPath().getName();  
  10.           //读取文件数据信息  
  11.           byte[] childData =  
  12.               readFile(childNodeStatus.getPath(), childNodeStatus.getLen());  
  13.           //如果是应用状态信息  
  14.           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {  
  15.             // application  
  16.             LOG.info(“Loading application from node: “ + childNodeName);  
  17.             ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);  
  18.             ApplicationStateDataPBImpl appStateData =  
  19.                 new ApplicationStateDataPBImpl(  
  20.                   ApplicationStateDataProto.parseFrom(childData));  
  21.             ApplicationState appState =  
  22.                 new ApplicationState(appStateData.getSubmitTime(),  
  23.                   appStateData.getApplicationSubmissionContext(),  
  24.                   appStateData.getUser());  
  25.             // assert child node name is same as actual applicationId  
  26.             assert appId.equals(appState.context.getApplicationId());  
  27.             rmState.appState.put(appId, appState);  
  28.           } else if (childNodeName  
  29.             .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {  
  30.             // attempt  
  31.             //如果是应用产生信息  
  32.             LOG.info(“Loading application attempt from node: “ + childNodeName);  
  33.             ApplicationAttemptId attemptId =  
  34.                 ConverterUtils.toApplicationAttemptId(childNodeName);  
  35.             ApplicationAttemptStateDataPBImpl attemptStateData =  
  36.                 new ApplicationAttemptStateDataPBImpl(  
  37.                   ApplicationAttemptStateDataProto.parseFrom(childData));  
  38.             Credentials credentials = null;  
  39.             if (attemptStateData.getAppAttemptTokens() != null) {  
  40.               credentials = new Credentials();  
  41.               DataInputByteBuffer dibb = new DataInputByteBuffer();  
  42.               dibb.reset(attemptStateData.getAppAttemptTokens());  
  43.               credentials.readTokenStorageStream(dibb);  
  44.             }  
  45.             ApplicationAttemptState attemptState =  
  46.                 new ApplicationAttemptState(attemptId,  
  47.                   attemptStateData.getMasterContainer(), credentials);  
  48.   
  49.             // assert child node name is same as application attempt id  
  50.             assert attemptId.equals(attemptState.getAttemptId());  
  51.             attempts.add(attemptState);  
  52.           } else {  
  53.             LOG.info(“Unknown child node with name: “ + childNodeName);  
  54.           }  
  55.         }  
  56.       }  


NullRMStateStore

空方法实现类,就是不保存状态信息操作,方法很简单,继承了方法,但不实现代码逻辑

  1. //空RM信息状态保存类,不实现保存方法的任何操作  
  2. @Unstable  
  3. public class NullRMStateStore extends RMStateStore {  
  4.   ….  
  5.     
  6.   //不实现加载状态方法  
  7.   @Override  
  8.   public RMState loadState() throws Exception {  
  9.     throw new UnsupportedOperationException(“Cannot load state from null store”);  
  10.   }  
  11.     
  12.   //具体保存应用方法也不实现  
  13.   @Override  
  14.   protected void storeApplicationState(String appId,  
  15.       ApplicationStateDataPBImpl appStateData) throws Exception {  
  16.     // Do nothing  
  17.   }  
  18.   
  19.   @Override  
  20.   protected void storeApplicationAttemptState(String attemptId,  
  21.       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {  
  22.     // Do nothing  
  23.   }  
  24.   
  25.   @Override  
  26.   protected void removeApplicationState(ApplicationState appState)  
  27.       throws Exception {  
  28.     // Do nothing  
  29.   }  
  30.   
  31.   …..  
  32. }  


那么如何使用上面这些类呢,在yarn的配置属性中,通过参数yarn.resource-manager.store.class进行类对象配置,填入类名称即可。

全部代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。

参考文献

《Hadoop技术内部–YARN架构设计与实现原理》.董西成

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9187.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论