【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 – 状态机库


一、状态机库概述

一)简介

状态机由一组状态组成:
【初始状态 -> 中间状态 -> 最终状态】。
在一个状态机中,每个状态会接收一组特定的事件,根据事件类型进行处理,并转换到下一个状态。当转换到最终状态时则退出。

二)状态转换方式

状态间转换会有下面这三种类型

三)Yarn 状态机类

在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮助定义状态机。如何使用,我们直接写个 demo。

一)状态机实现

状态机实现,可以直接嵌入到上篇文章中的 AsyncDispatcher使用。
这里仅给出状态机JobStateMachine以及各种事件处理的代码。完整的代码项目执行,请到 github demo 中查看。

import com.shuofxz.event.JobEvent;
import com.shuofxz.event.JobEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;

import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/*
* 可参考 Yarn 中实现的状态机对象:
* ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,
* NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,
* MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等
* */
@SuppressWarnings({"rawtypes", "unchecked"})
public class JobStateMachine implements EventHandler<JobEvent> {
    private final String jobID;
    private EventHandler eventHandler;
    private final Lock writeLock;
    private final Lock readLock;

    // 定义状态机
    protected static final StateMachineFactory<JobStateMachine, JobStateInternal,
            JobEventType, JobEvent>
            stateMachineFactory = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)
            .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())
            .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())
            .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())
            .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())
            .installTopology();

    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;

    public JobStateMachine(String jobID, EventHandler eventHandler) {
        this.jobID = jobID;

        // 多线程异步处理,state 有可能被同时读写,使用读写锁来避免竞争
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();

        this.eventHandler = eventHandler;
        stateMachine = stateMachineFactory.make(this);
    }

    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
        return stateMachine;
    }

    public static class InitTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
        @Override
        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);
            // do something...
            // 完成后发送新的 Event —— JOB_START
            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));
        }
    }

    public static class StartTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
        @Override
        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);
            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));
        }
    }

    public static class SetupCompletedTransition implements SingleArcTransition<JobStateMachine, JobEvent> {
        @Override
        public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);
            jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));
        }
    }

    public static class JobTasksCompletedTransition implements MultipleArcTransition<JobStateMachine, JobEvent, JobStateInternal> {
        @Override
        public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
            System.out.println("Receiving event " + jobEvent);

            // 这是多结果状态部分,因此需要人为制定后续状态
            // 这里整个流程结束,设置一下对应的状态
            boolean flag = true;
            if (flag) {
                return JobStateInternal.SUCCEEDED;
            } else {
                return JobStateInternal.KILLED;
            }
        }
    }

    @Override
    public void handle(JobEvent jobEvent) {
        try {
            // 注意这里为了避免静态条件,使用了读写锁
            writeLock.lock();
            JobStateInternal oldState = getInternalState();
            try {
                getStateMachine().doTransition(jobEvent.getType(), jobEvent);
            } catch (InvalidStateTransitionException e) {
                System.out.println("Can't handle this event at current state!");
            }
            if (oldState != getInternalState()) {
                System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());
            }

        } finally {
            writeLock.unlock();
        }
    }

    public JobStateInternal getInternalState() {
        readLock.lock();
        try {
            return getStateMachine().getCurrentState();
        } finally {
            readLock.unlock();
        }
    }

    public enum JobStateInternal {
        NEW,
        SETUP,
        INITED,
        RUNNING,
        SUCCEEDED,
        KILLED
    }
}

二)状态机可视化

hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java,可以拷贝到我们的工程中使用。
根据提示,运行需要三个参数

本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;

2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;

3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;

4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;

5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/293000.html

(0)
上一篇 2022年11月12日
下一篇 2022年11月12日

相关推荐

发表回复

登录后才能评论