Java ThreadPoolExecutor线程池原理详解编程语言

前言

《Java 线程池理解》简要梳理java线程池ThreadPoolExecutor的使用和主要流程,这篇在此基础上继续学习ThreadPoolExecutor的原理,顺带巩固AQS和CAS的知识。

ThreadPoolExecutor构造函数

public ThreadPoolExecutor(int corePoolSize,  
    int maximumPoolSize,  
    long keepAliveTime, 
    TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, 
    ThreadFactory threadFactory,  
    RejectedExecutionHandler handler); 

ThreadPoolExecutor主要参数

  1. corePoolSize 核心线程数量
  2. maximumPoolSize 最大线程数量
  3. unit 超时时间单位 ( TimeUnit.SECONDS 等)
  4. keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了
  5. workQueue 任务阻塞队列,用来缓存待执行的任务
  6. ThreadFactory 负责创建Thread线程
  7. RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略

ThreadPoolExecutor执行流程

执行流程

AQS的使用

在分析ThreadPoolExecutor之前,先学习AbstractQueueSynchronizer,简称AQS,他是实现ReentrantLock、CountDownLatch的重要部分,ThreadPoolExecutor 也使用了AQS实现非重入锁机制。可以参考《ReentrantLock原理从开始到放弃》

简要说一下AQS的实现原理以及优势

AQS 基本数据结构是一个FIFO的双向队列,每个结点Node存储线程和其他信息;

队列的头部结点表示:该结点对应的线程已经处于执行状态,占用了资源;剩下的队列里的线程则被挂起等待唤醒。
AQS
AQS实现了一个FIFO的队列,让线程根据规则排列阻塞和争夺资源。

AQS提供了独占方式和共享方式,对应独占锁和共享锁。

AQS提供模板方法需要具体子类实现功能

  • isHeldExclusively():该线程是否正在独占资源
  • tryAcquire(int):独占方式下尝试去获取资源
  • tryRelease(int):独占方式下尝试释放资源
  • tryAcquireShared(int):共享方式下,尝试获取资源。返回值表示剩余资源,负数表示失败
  • tryReleaseShared(int):共享方式

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法。

支持独占锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively

支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。

AQS获取、修改state状态

  • getState() 获取状态(资源)
  • setState() 设置状态(资源)
  • compareAndSetState() CAS设置状态(资源)

state可以在不同的场景下表达不同的意义,例如在ReentrantLock表示重入次数,CountDownLatch表示当前计数。

以ReentrantLock为例子:

ReentrantLock AQS
这里只抽取了ReentrantLock 的公平锁部分核心代码看下:

 static final class FairSync extends Sync {
    
        private static final long serialVersionUID = -3000897897090466540L; 
        final void lock() {
    // 线程调用lock 
            acquire(1); // 调用AQS的acquire()内部会顺序获取 
        } 
        // 具体的尝试获取锁方法,state 
        protected final boolean tryAcquire(int acquires) {
    
            final Thread current = Thread.currentThread(); 
            int c = getState(); // 获取当前状态 
            // 当且state为0时候才可以表示获取锁 
            if (c == 0) {
    
            // hasQueuedPredecessors 表示是否有其他线程排队, 
            // 如果没人排队就不必入队,直接CAS尝试去获取锁 
                if (!hasQueuedPredecessors() && 
                    compareAndSetState(0, acquires)) {
    
                    // 获取锁成功,设置当前线程独占 
                    setExclusiveOwnerThread(current); 
                    return true; 
                } 
            } 
            // 当前线程可重入 
            else if (current == getExclusiveOwnerThread()) {
    
                int nextc = c + acquires; 
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded"); 
                setState(nextc); 
                return true; 
            } 
            return false; 
        } 
    } 

线程池状态

线程池每一时刻都会处于以下一种状态。

RUNNING: 可接收新的任务,当前有任务正在排队、执行
SHUTDOWN: 不接收新的任务,当前有任务正在排队、执行
STOP: 不接收新的任务,不允许任务执行、排队,中断正在执行的任务
TIDYING: 所有任务都被终止,无执行任务线程,会执行terminated()
TERMINATED: terminated() 方法执行完成

// 线程池状态 + 数量 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
 
private static final int COUNT_BITS = Integer.SIZE - 3; 
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 
 
// runState is stored in the high-order bits 
private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS; 
private static final int STOP       =  1 << COUNT_BITS; 
private static final int TIDYING    =  2 << COUNT_BITS; 
private static final int TERMINATED =  3 << COUNT_BITS; 

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

ctl有32位bit,其中前3位表示线程状态,后面39位表示线程数量,所以线程数最大为2的39次方-1。

提供几个方法:

// 获取线程池状态 
private static int runStateOf(int c)     {
    return c & ~CAPACITY; } 
// 获取线程数 
private static int workerCountOf(int c)  {
    return c & CAPACITY; } 
// 利用位运算集合线程池状态和数量,计算ctl 
private static int ctlOf(int rs, int wc) {
    return rs | wc; } 

worker

利用worker实现非可重入锁

private final class Worker extends AbstractQueuedSynchronizer implements Runnable  
    /** Thread this worker is running in.  Null if factory fails. */ 
    final Thread thread; 
    /** Initial task to run.  Possibly null. */ 
    Runnable firstTask; 
    /** Per-thread task counter */ 
    volatile long completedTasks; 
 
    // 执行的第一个任务,后续任务从blockQueue获取 
    Worker(Runnable firstTask) {
    
        // 设置为-1,不允许被线程获取锁,后续到runWorker()内部会解除这个状态 
        setState(-1); // inhibit interrupts until runWorker 
        this.firstTask = firstTask; 
    	// 由定义的ThreadFactory创建线程 
        this.thread = getThreadFactory().newThread(this); 
    } 
 
     /** Delegates main run loop to outer runWorker. */ 
    public void run() {
    
        // 后续详细分析 
        runWorker(this); 
    } 
 
    // 0表示无锁,非0表示锁占用状态 
    protected boolean isHeldExclusively() {
    
        return getState() != 0; 
    } 
 
    // 尝试获取资源 
    protected boolean tryAcquire(int unused) {
    
        if (compareAndSetState(0, 1)) {
    
            setExclusiveOwnerThread(Thread.currentThread()); 
            return true; 
        } 
        return false; 
    } 
     
    // 尝试释放资源 
    protected boolean tryRelease(int unused) {
    
        setExclusiveOwnerThread(null); 
        setState(0); 
        return true; 
    } 
 
    // 直接acquire,实现非重入锁,对比下ReentrantLock 
    public void lock()        {
    acquire(1); } 
    public boolean tryLock()  {
    return tryAcquire(1); } 
    public void unlock()      {
    release(1); } 
    public boolean isLocked() {
    return isHeldExclusively(); } 
} 

对比下ReentrantLock和Worker的tryAcquire(),Worker中并没有允许同个线程多次获取锁,也没有做过多优化。

runWorker()

再看下runWorker()是我们创建的线程死循环执行取任务、执行任务的过程,简单的看下具体流程。

final void runWorker(Worker w) {
    
       // 运行在创建的子线程中 
        Thread wt = Thread.currentThread(); 
        Runnable task = w.firstTask; 
        w.firstTask = null; 
        // 解锁状态,对应上面 setState(-1);  
        w.unlock(); // allow interrupts 
        // 表示任务执行是否被中断了 
        boolean completedAbruptly = true; 
        try {
    
            // getTask()会从blockQueue取任务,控制线程数量等操作 
            while (task != null || (task = getTask()) != null) {
    
			    // 锁住worker 
                w.lock(); 
                // 这部分是跟线程池状态有关的判断 
                if ((runStateAtLeast(ctl.get(), STOP) || 
                     (Thread.interrupted() && 
                      runStateAtLeast(ctl.get(), STOP))) && 
                    !wt.isInterrupted()) 
                    wt.interrupt(); 
                try {
    
                    // 执行任务前,留给子类的空实现 
                    beforeExecute(wt, task); 
                    Throwable thrown = null; 
                    try {
    
                        // 执行任务 
                        task.run(); 
                    } catch (RuntimeException x) {
    
                        thrown = x; throw x; 
                    } catch (Error x) {
    
                        thrown = x; throw x; 
                    } catch (Throwable x) {
    
                        thrown = x; throw new Error(x); 
                    } finally {
    
                        // 执行任务后,留给子类的空实现 
                        afterExecute(task, thrown); 
                    } 
                } finally {
    
                    task = null; 
                    // 完成任务数+1 
                    w.completedTasks++; 
                    w.unlock(); 
                } 
            } 
            // 未被中断 
            completedAbruptly = false; 
        } finally {
    
            // 线程结束操作 
            processWorkerExit(w, completedAbruptly); 
        } 
    } 

getTask() 获取任务(返回null代表结束当前线程)

getTask()负责取出要执行的任务,如果返回null就会走到线程结束的操作

private Runnable getTask() {
    
        boolean timedOut = false; //超时 
 
        for (;;) {
    
            int c = ctl.get(); 
            int rs = runStateOf(c); 
            // 检测线程池状态 
            // Check if queue empty only if necessary. 
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    
                decrementWorkerCount(); 
                return null; 
            } 
            // 当前线程数 
            int wc = workerCountOf(c); 
            // 若阻塞超时的话,是否允许回收掉当前线程 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 
            // 当前线程数超过最大线程数,或者,超时了且允许被回收 
            if ((wc > maximumPoolSize || (timed && timedOut)) 
                && (wc > 1 || workQueue.isEmpty())) {
    
                if (compareAndDecrementWorkerCount(c)) 
                    return null; 
                continue; 
            } 
            // 从阻塞队列取任务,或者队列为空被阻塞 
            try {
    
                Runnable r = timed ? 
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
                    workQueue.take(); 
                if (r != null) 
                    return r; //获取到任务 
                timedOut = true;// 超时 
            } catch (InterruptedException retry) {
    
                timedOut = false; 
            } 
        } 
    } 

getTask()内部是一个for语句循环,

  • 阻塞队列不为空,能取到任务直接返回task;
  • 阻塞队列为空,阻塞一段时间超时,如果允许核心线程超时被回收,或者当前线程数 > 核心线程数 或者收,则返回null,让当前线程被结束掉;
  • 走到这里的留下的核心线程,只能在死循环 + 阻塞队列阻塞等待任务的到来。

processWorkerExit() 结束线程

processWorkerExit()执行线程结束操作,处理了控制线程数、回收线程资源的操作

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    
        // 线程被中断了 
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 
            // 将work数减一 
            decrementWorkerCount(); 
        // 因为要操作全局变量,利用ReentrantLock锁 
        final ReentrantLock mainLock = this.mainLock; 
        mainLock.lock(); 
        try {
    
            // 总的任务完成数 
            completedTaskCount += w.completedTasks; 
            // 移除当前worker 
            workers.remove(w); 
        } finally {
    
            mainLock.unlock(); 
        } 
 
        tryTerminate(); 
 
        int c = ctl.get(); 
        // 如果在没有停止状态 
        if (runStateLessThan(c, STOP)) {
    
            // 不是因为异常中断 
            if (!completedAbruptly) {
     
            // allowCoreThreadTimeOut表示是否允许核心线程超时回收,如果允许就可以不添加worker 
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 
                if (min == 0 && ! workQueue.isEmpty()) 
                    min = 1; 
                // worker数量超过core thread数量 
                if (workerCountOf(c) >= min) 
                    return; // replacement not needed 
            } 
            // 若因为异常中断,或者当前线程数<coreThread数量,添加一个worker线程 
            addWorker(null, false); 
        } 
    } 

addWorker() 内部主要做两个操作:

  1. 校验当前线程池状态
  2. 添加一个worker线程
private boolean addWorker(Runnable firstTask, boolean core) {
    
    w = new Worker(firstTask); 
    final Thread t = w.thread; 
    final ReentrantLock mainLock = this.mainLock; 
    mainLock.lock(); 
    workers.add(w); 
    mainLock.unlock(); 
    t.start(); 
} 

到达这里,我们知道Worker是利用AQS实现一个非可重入锁,但是为什么需要一个非可重入锁呢?

每一个Worker在各自的子线程中运行,取任务前lock,在任务结束之后unlock,一旦获取了独占锁,表示当前线程正在执行任务中。

可是也并没有其他线程争夺Worker,争夺的应该是task,而task利用阻塞队列实现多线程竞争了。

不是很理解为什么需要非可重入锁?

后面发现,使用独占锁来表示线程是否正在执行任务,Worker的线程获取了独占锁就说明它在执行任务,不能被中断。

execute流程

execute流程分成三步:

  1. 如果当前worker线程少于核心线程数,则addWorker()添加一个线程执行。
  2. 如果worker线程数等于核心线程数,则将任务加入到阻塞队列workQueue
  3. 阻塞队列满了且未等到最大线程数之前,增加一个worker线程
  4. 否则,执行拒绝策略
public void execute(Runnable command) {
    
        if (command == null) 
            throw new NullPointerException(); 
        int c = ctl.get(); 
        if (workerCountOf(c) < corePoolSize) {
    
            if (addWorker(command, true)) 
                return; 
            c = ctl.get(); 
        } 
        if (isRunning(c) && workQueue.offer(command)) {
    
            int recheck = ctl.get(); 
            if (! isRunning(recheck) && remove(command)) 
                reject(command); 
            else if (workerCountOf(recheck) == 0) 
                addWorker(null, false); 
        } 
        else if (!addWorker(command, false)) 
            reject(command); 
    } 

总结

  • 线程池会分核心线程和非核心线程吗?

    不会,ThreadPoolThread是控制线程数来保证核心线程数量

  • 线程池在哪里控制数量

    利用阻塞队列阻塞,若超时了并且允许回收,则当前线程会被结束,若不允许结束,则当前线程会死循环并且一段时间阻塞的等待任务。

  • worker为什么需要不可重入锁

    Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断

    参考https://www.cnblogs.com/liuzhihu/p/8177371.html

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/19391.html

(0)
上一篇 2021年7月19日 22:08
下一篇 2021年7月19日 22:09

相关推荐

发表回复

登录后才能评论