1、NodeManager概述
NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。
NodeManager整体架构:
2、NodeManager分析
接下来将按照启动NodeManager时代码执行的顺序为主线进行代码分析。
2.1 main函数
- 打印NodeManager启动和关闭日志信息;
- 创建NodeManager对象;
- 加载配置文件初始化配置项;
- 调用initAndStartNodeManager函数。
主要代码:
public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); //打印Nodemangager启动和关闭时的日志信息 StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); //创建NodeManager对象 NodeManager nodeManager = new NodeManager(); //加载配置文件初始化 Configuration conf = new YarnConfiguration(); //初始化并启动NodeManager nodeManager.initAndStartNodeManager(conf, false); }
2.2 initAndStartNodeManager函数
- 调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
- 启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) { try { // Remove the old hook if we are rebooting. if (hasToReboot && null != nodeManagerShutdownHook) { ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook); } //增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService nodeManagerShutdownHook = new CompositeServiceShutdownHook(this); ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook, SHUTDOWN_HOOK_PRIORITY); //调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化) this.init(conf); //启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务) this.start(); } catch (Throwable t) { LOG.fatal("Error starting NodeManager", t); System.exit(-1); } }
2.3init函数
(1)init方法是从Service接口,在AbstractService抽象类中得到实现。在AbstractService类中的init方法调用protected 类型的serviceInit。在其子类NodeMananger中重写了serviceInit方法。
AbstractService抽象类中init方法实现:
@Override public void init(Configuration conf) { if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
(2)NodeManager类中serviceInit方法中是添加一些服务。具体如下:
- 进行基本的配置操作,例如从配置文件读入参数等
- 创建并添加DeletionService、NodeHealthCheckerService到父类的一个ArrayList<Service>对象serviceList中
- 调用createNodeStatusUpdater()函数创建NodeStatusUpdater对象,再调用NodeManager的父类方法register将这个对象注册添加父类的ArrayList<Service>对象listeners中.
- 调用createContainerManager()、createWebServer()函数和createNodeResourceMonitor函数创建ContainerManagerImpl对象、Service对象和NodeResourceMonitor对象,并将其添加到serviceList
- 将ContainerManagerImpl注册到之前创建好的异步事件调度器AsyncDispatcher中,然后将将AsyncDispatcher调度器添加到服务队列serviceList中
- 最后添加NodeStatusUpdater对象到serviceList中,最后添加这个对象的原因是要在心跳操作启动必须在所有其他服务之后。
- 调用父类的init()函数,进行其他的配置初始化。
主要代码:
@Override protected void serviceInit(Configuration conf) throws Exception { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); NMContainerTokenSecretManager containerTokenSecretManager = new NMContainerTokenSecretManager(conf); NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(); this.aclsManager = new ApplicationACLsManager(conf); //始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法, //包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class //决定ContainerExecutor的实例, 默认为DefaultContainerExecutor. ContainerExecutor exec = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class, ContainerExecutor.class), conf); try { exec.init(); } catch (IOException e) { throw new YarnRuntimeException("Failed to initialize container executor", e); } DeletionService del = createDeletionService(exec); addService(del); // NodeManager level dispatcher 异步分发器 this.dispatcher = new AsyncDispatcher(); //可以通过此服务查询node是否健康, 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy nodeHealthChecker = new NodeHealthCheckerService(); addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); this.context = createNMContext(containerTokenSecretManager, nmTokenSecretManager); //创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态). //这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能 nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); // 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started) NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); addService(nodeResourceMonitor); //创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议, ContainerManager协议为APP向NodeManager通信的协议 containerManager = createContainerManager(context, exec, del, nodeStatusUpdater, this.aclsManager, dirsHandler); addService(containerManager); ((NMContext) context).setContainerManager(containerManager); // 创建webServer, 启动NodeManager的web服务. 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042 WebServer webServer = createWebServer(context, containerManager .getContainersMonitor(), this.aclsManager, dirsHandler); addService(webServer); ((NMContext) context).setWebServer(webServer); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); addService(dispatcher); //初始化监控 DefaultMetricsSystem.initialize("NodeManager"); // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); super.serviceInit(conf); // TODO add local dirs to del }
2.4 start函数
- 进行安全认证操作
- 调用父类的start()函数依次启动在NodeManager类serviceInit中所有添加好的服务。其中AsyncDispatcher负责事件的传送,NodeStatusUpdater负责产生心跳事件,ContainerManagerImpl负责提供Hadoop RPC需要的函数等等
AbstractService中start方法的具体实现:
@Override public void start() { if (isInState(STATE.STARTED)) { return; } //enter the started state synchronized (stateChangeLock) { if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) { try { startTime = System.currentTimeMillis(); //会调用子类NN中重写的同名方法 serviceStart(); if (isInState(STATE.STARTED)) { //if the service started (and isn't now in a later state), notify if (LOG.isDebugEnabled()) { LOG.debug("Service " + getName() + " is started"); } notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
NodeManager中重写的serviceStart方法的主要代码:
@Override protected void serviceStart() throws Exception { try { doSecureLogin(); } catch (IOException e) { throw new YarnRuntimeException("Failed NodeManager login", e); } super.serviceStart(); }
3、参考资料:
http://www.technology-mania.com/2014/05/an-insight-into-hadoop-yarn-nodemanager.html
http://www.cnblogs.com/biyeymyhjob/archive/2012/08/18/2645576.html
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9558.html