// 结果: 111 00000000000000000000000000000

private static final int RUNNING    = -1 << COUNT_BITS;

// 结果: 000 00000000000000000000000000000

private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 结果: 001 00000000000000000000000000000

private static final int STOP       =  1 << COUNT_BITS;

// 结果: 010 00000000000000000000000000000

private static final int TIDYING    =  2 << COUNT_BITS;

// 结果: 011 00000000000000000000000000000

private static final int TERMINATED =  3 << COUNT_BITS;

// 获取线程池的状态

private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 获取线程数量

private static int workerCountOf(int c)  { return c & CAPACITY; }

}


可以看到,ctl 是一个 原子类的 Integer 变量,有 32 位,低 29 位表示线程数量, 29 位最大可以表示 (2^29)-1 (大概 5 亿多),足够记录线程大小了,如果未来还是不够,可以把 ctl 声明为 AtomicLong,高 3 位用来表示线程池的状态,3 位可以表示 8 个线程池的状态,由于线程池总共只有五个状态,所以 3 位也是足够了,线程池的五个状态如下

*   RUNNING: 接收新的任务,并能继续处理 workQueue 中的任务

*   SHUTDOWN: 不再接收新的任务,不过能继续处理 workQueue 中的任务

*   STOP: 不再接收新的任务,也不再处理 workQueue 中的任务,并且会中断正在处理任务的线程

*   TIDYING: 所有的任务都完结了,并且线程数量(workCount)为 0 时即为此状态,进入此状态后会调用 terminated() 这个钩子方法进入 TERMINATED 状态

*   TERMINATED: 调用 terminated() 方法后即为此状态

线程池的状态流转及触发条件如下  

![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163099185952.jpg)

有了这些基础,我们来分析下 execute 的源码

public void execute(Runnable command) {

if (command == null)

    throw new NullPointerException();

int c = ctl.get();

// 如果当前线程数少于核心线程数(corePoolSize),无论核心线程是否忙碌,都创建线程,直到达到 corePoolSize 为止

if (workerCountOf(c) < corePoolSize) {

    // 创建线程并将此任务交给 worker 处理(此时此任务即 worker 中的 firstTask)

    if (addWorker(command, true))

        return;

    c = ctl.get();

}

// 如果线程池处于 RUNNING 状态,并且线程数大于 corePoolSize 或者 

// 线程数少于 corePoolSize 但创建线程失败了,则将任务丢进 workQueue 中

if (isRunning(c) && workQueue.offer(command)) {

    int recheck = ctl.get();

    // 这里需要再次检查线程池是否处于 RUNNING 状态,因为在任务入队后可能线程池状态会发生变化,(比如调用了 shutdown 方法等),如果线程状态发生变化了,则移除此任务,执行拒绝策略

    if (! isRunning(recheck) && remove(command))

        reject(command);

    // 如果线程池在 RUNNING 状态下,线程数为 0,则新建线程加速处理 workQueue 中的任务

    else if (workerCountOf(recheck) == 0)

        addWorker(null, false);

}

// 这段逻辑说明线程数大于 corePoolSize 且任务入队失败了,此时会以最大线程数(maximumPoolSize)为界来创建线程,如果失败,说明线程数超过了 maximumPoolSize,则执行拒绝策略

else if (!addWorker(command, false))

    reject(command);

}


从这段代码中可以看到,创建线程是调用 addWorker 实现的,在分析 addWorker 之前,有必要简单提一下 Worker,线程池把每一个执行任务的线程都封装为 Worker 的形式,取名为 Worker 很形象,线程池的本质是生产者-消费者模型,生产者不断地往 workQueue 中丢 task, workQueue 就像流水线一样不断地输送着任务,而 worker(工人) 不断地取任务来执行  

![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163099484774.jpg)  

那么问题来了,为啥要把线程封装到 worker 中呢,线程池拿到 task 后直接丢给线程处理或者让线程自己去 workQueue 中处理不就完了?  

将线程封装为 worker 主要是为了更好地管理线程的中断

来看下 Worker 的定义

// 此处可以看出 worker 既是一个 Runnable 任务,也实现了 AQS(实际上是用 AQS 实现了一个独占锁,这样由于 worker 运行时会上锁,执行 shutdown,setCorePoolSize,setMaximumPoolSize等方法时会试着中断线程(interruptIdleWorkers) ,在这个方法中断方法中会先尝试获取 worker 的锁,如果不成功,说明 worker 在运行中,此时会先让 worker 执行完任务再关闭 worker 的线程,实现优雅关闭线程的目的)

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

    private static final long serialVersionUID = 6138294804551838833L;

    // 实际执行任务的线程

    final Thread thread;

    // 上文提到,如果当前线程数少于核心线程数,创建线程并将提交的任务交给 worker 处理处理,此时 firstTask 即为此提交的任务,如果 worker 从 workQueue 中获取任务,则 firstTask 为空

    Runnable firstTask;

    // 统计完成的任务数

    volatile long completedTasks;

    Worker(Runnable firstTask) {

        // 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0

        setState(-1); 

        this.firstTask = firstTask;

        // 根据线程池的 threadFactory 创建一个线程,将 worker 本身传给线程(因为 worker 实现了 Runnable 接口)

        this.thread = getThreadFactory().newThread(this);

    }

    public void run() {

        // thread 启动后会调用此方法

        runWorker(this);

    }

    // 1 代表被锁住了,0 代表未锁

    protected boolean isHeldExclusively() {

        return getState() != 0;

    }

    // 尝试获取锁

    protected boolean tryAcquire(int unused) {

        // 从这里可以看出它是一个独占锁,因为当获取锁后,cas 设置 state 不可能成功,这里我们也能明白上文中将 state 设置为 -1 的作用,这种情况下永远不可能获取得锁,而 worker 要被中断首先必须获取锁

        if (compareAndSetState(0, 1)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }

    // 尝试释放锁

    protected boolean tryRelease(int unused) {

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

    }    

    public void lock()        { acquire(1); }

    public boolean tryLock()  { return tryAcquire(1); }

    public void unlock()      { release(1); }

    public boolean isLocked() { return isHeldExclusively(); }

    // 中断线程,这个方法会被 shutdowNow 调用,从中可以看出 shutdownNow 要中断线程不需要获取锁,也就是说如果线程正在运行,照样会给你中断掉,所以一般来说我们不用 shutdowNow 来中断线程,太粗暴了,中断时线程很可能在执行任务,影响任务执行

    void interruptIfStarted() {

        Thread t;

        // 中断也是有条件的,必须是 state >= 0 且 t != null 且线程未被中断

        // 如果 state == -1 ,不执行中断,再次明白了为啥上文中 setState(-1) 的意义

        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

            try {

                t.interrupt();

            } catch (SecurityException ignore) {

            }

        }

    }

} 

通过上文对 Worker 类的分析,相信大家不难理解 将线程封装为 worker 主要是为了更好地管理线程的中断 这句话。

理解了 Worker 的意义,我们再来看 addWorker 的方法

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

    int c = ctl.get();

    // 获取线程池的状态

    int rs = runStateOf(c);

    // 如果线程池的状态 >= SHUTDOWN,即为 SHUTDOWN,STOP,TIDYING,TERMINATED 这四个状态,只有一种情况有可能创建线程,即线程状态为 SHUTDOWN, 且队列非空时,firstTask == null 代表创建一个不接收新任务的线程(此线程会从 workQueue 中获取任务再执行),这种情况下创建线程是为了加速处理完 workQueue 中的任务

    if (rs >= SHUTDOWN &&

        ! (rs == SHUTDOWN &&

           firstTask == null &&

           ! workQueue.isEmpty()))

        return false;

    for (;;) {

        // 获取线程数

        int wc = workerCountOf(c);

        // 如果超过了线程池的最大 CAPACITY(5 亿多,基本不可能)

        // 或者 超过了 corePoolSize(core 为 true) 或者 maximumPoolSize(core 为 false) 时

        // 则返回 false

        if (wc >= CAPACITY ||

            wc >= (core ? corePoolSize : maximumPoolSize))

            return false;

        // 否则 CAS 增加线程的数量,如果成功跳出双重循环

        if (compareAndIncrementWorkerCount(c))

            break retry;

        c = ctl.get();  // Re-read ctl

        // 如果线程运行状态发生变化,跳到外层循环继续执行

        if (runStateOf(c) != rs)

            continue retry;

        // 说明是因为 CAS 增加线程数量失败所致,继续执行 retry 的内层循环

    }

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

    // 能执行到这里,说明满足增加 worker 的条件了,所以创建 worker,准备添加进线程池中执行任务

    w = new Worker(firstTask);

    final Thread t = w.thread;

    if (t != null) {

        // 加锁,是因为下文要把 w 添加进 workers 中, workers 是 HashSet,不是线程安全的,所以需要加锁予以保证

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            //  再次 check 线程池的状态以防执行到此步时发生中断等

            int rs = runStateOf(ctl.get());

            // 如果线程池状态小于 SHUTDOWN(即为 RUNNING),

            // 或者状态为 SHUTDOWN 但 firstTask == null(代表不接收任务,只是创建线程处理 workQueue 中的任务),则满足添加 worker 的条件

            if (rs < SHUTDOWN ||

                (rs == SHUTDOWN && firstTask == null)) {

                                    // 如果线程已启动,显然有问题(因为创建 worker 后,还没启动线程呢),抛出异常

                if (t.isAlive()) 

                    throw new IllegalThreadStateException();

                workers.add(w);

                int s = workers.size();

                // 记录最大的线程池大小以作监控之用

                if (s > largestPoolSize)

                    largestPoolSize = s;

                workerAdded = true;

            }

        } finally {

            mainLock.unlock();

        }

        // 说明往 workers 中添加 worker 成功,此时启动线程

        if (workerAdded) {

            t.start();

            workerStarted = true;

        }

    }

} finally {

    // 添加线程失败,执行 addWorkerFailed 方法,主要做了将 worker 从 workers 中移除,减少线程数,并尝试着关闭线程池这样的操作

    if (! workerStarted)

        addWorkerFailed(w);

}

return workerStarted;

}


从这段代码我们可以看到多线程下情况的不可预料性,我们发现在满足条件情况下,又对线程状态重新进行了 check,以防期间出现中断等线程池状态发生变更的操作,这也给我们以启发:多线程环境下的各种临界条件一定要考虑到位。

执行 addWorker 创建 worker 成功后,线程开始执行了(t.start()),由于在创建 Worker 时,将 Worker 自己传给了此线程,所以启动线程后,会调用 Worker 的 run 方法

public void run() {

runWorker(this);

}


可以看到最终会调用 runWorker 方法,接下来我们来分析下 runWorker 方法

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

// unlock 会调用 tryRelease 方法将 state 设置成 0,代表允许中断,允许中断的条件上文我们在 interruptIfStarted() 中有提过,即 state >= 0

w.unlock();

boolean completedAbruptly = true;

try {

    // 如果在提交任务时创建了线程,并把任务丢给此线程,则会先执行此 task

    // 否则从任务队列中获取 task 来执行(即 getTask() 方法)

    while (task != null || (task = getTask()) != null) {

        w.lock();

        // 如果线程池状态为 >= STOP(即 STOP,TIDYING,TERMINATED )时,则线程应该中断

        // 如果线程池状态 < STOP, 线程不应该中断,如果中断了(Thread.interrupted() 返回 true,并清除标志位),再次判断线程池状态(防止在清除标志位时执行了 shutdownNow() 这样的方法),如果此时线程池为 STOP,执行线程中断

        if ((runStateAtLeast(ctl.get(), STOP) ||

             (Thread.interrupted() &&

              runStateAtLeast(ctl.get(), STOP))) &&

            !wt.isInterrupted())

            wt.interrupt();

        try {

            // 执行任务前,子类可实现此钩子方法作为统计之用

            beforeExecute(wt, task);

            Throwable thrown = null;

            try {

                task.run();

            } catch (RuntimeException x) {

                thrown = x; throw x;

            } catch (Error x) {

                thrown = x; throw x;

            } catch (Throwable x) {

                thrown = x; throw new Error(x);

            } finally {

                // 执行任务后,子类可实现此钩子方法作为统计之用

                afterExecute(task, thrown);

            }

        } finally {

            task = null;

            w.completedTasks++;

            w.unlock();

        }

    }

    completedAbruptly = false;

} finally {

    // 如果执行到这只有两种可能,一种是执行过程中异常中断了,一种是队列里没有任务了,从这里可以看出线程没有核心线程与非核心线程之分,哪个任务异常了或者正常退出了都会执行此方法,此方法会根据情况将线程数-1

    processWorkerExit(w, completedAbruptly);

}

}


来看看 processWorkerExit 方法是咋样的

private void processWorkerExit(Worker w, boolean completedAbruptly) {

    // 如果异常退出,cas 执行线程池减 1 操作

if (completedAbruptly) 

    decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

    completedTaskCount += w.completedTasks;

    // 加锁确保线程安全地移除 worker 

    workers.remove(w);

} finally {

    mainLock.unlock();

}

// woker 既然异常退出,可能线程池状态变了(如执行 shutdown 等),尝试着关闭线程池

tryTerminate();

int c = ctl.get();

//  如果线程池处于 STOP 状态,则如果 woker 是异常退出的,重新新增一个 woker,如果是正常退出的,在 wokerQueue 为非空的条件下,确保至少有一个线程在运行以执行 wokerQueue 中的任务

if (runStateLessThan(c, STOP)) {

    if (!completedAbruptly) {

        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

        if (min == 0 && ! workQueue.isEmpty())

            min = 1;

        if (workerCountOf(c) >= min)

            return; // replacement not needed

    }

    addWorker(null, false);

}

}


接下来我们分析 woker 从 workQueue 中取任务的方法 getTask

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

for (;;) {

    int c = ctl.get();

    int rs = runStateOf(c);

    // 如果线程池状态至少为 STOP 或者

    // 线程池状态 == SHUTDOWN 并且任务队列是空的

    // 则减少线程数量,返回 null,这种情况下上文分析的 runWorker 会执行 processWorkerExit 从而让获取此 Task 的 woker 退出

    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

        decrementWorkerCount();

        return null;

    }

    int wc = workerCountOf(c);

    // 如果 allowCoreThreadTimeOut 为 true,代表任何线程在 keepAliveTime 时间内处于 idle 状态都会被回收,如果线程数大于 corePoolSize,本身在 keepAliveTime 时间内处于 idle 状态就会被回收

    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    // worker 应该被回收的几个条件,这个比较简单,就此略过

    if ((wc > maximumPoolSize || (timed && timedOut))

        && (wc > 1 || workQueue.isEmpty())) {

        if (compareAndDecrementWorkerCount(c))

            return null;

        continue;

    }

    try {

       // 阻塞获取 task,如果在 keepAliveTime 时间内未获取任务,说明超时了,此时 timedOut 为 true

        Runnable r = timed ?

            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

            workQueue.take();

        if (r != null)

            return r;

        timedOut = true;

    } catch (InterruptedException retry) {

        timedOut = false;

    }

}

}



经过以上源码剖析,相信我们对线程池的工作原理了解得八九不离十了,再来简单过一下其他一些比较有用的方法,开头我们提到线程池的监控问题,我们看一下可以监控哪些指标

*   int getCorePoolSize():获取核心线程数。

*   int getLargestPoolSize():历史峰值线程数。

*   int getMaximumPoolSize():最大线程数(线程池线程容量)。

*   int getActiveCount():当前活跃线程数

*   int getPoolSize():当前线程池中的线程总数

*   BlockingQueue getQueue() 当前线程池的任务队列,据此可以获取积压任务的总数,getQueue.size()

监控思路也很简单,开启一个定时线程 ScheduledThreadPoolExecutor,定期对这些线程池指标进行采集,一般会采用一些开源工具如 Grafana + Prometheus + MicroMeter 来实现。

> 如何实现核心线程池的预热

使用 prestartAllCoreThreads() 方法,这个方法会一次性创建 corePoolSize 个线程,无需等到提

> 如何实现动态调整线程池参数

*   setCorePoolSize(int corePoolSize) 调整核心线程池大小

*   setMaximumPoolSize(int maximumPoolSize)

*   setKeepAliveTime() 设置线程的存活时间

[](https://gitee.com/vip204888/java-p7)解答开篇的问题

--------------------------------------------------------------------------

1、Tomcat 的线程池和 JDK 的线程池实现有啥区别, Dubbo 中有类似 Tomcat 的线程池实现吗? Dubbo 中一个叫 EagerThreadPool 的东西,可以看看它的使用说明  

![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163100421327.jpg)

从注释里可以看出,如果核心线程都处于 busy 状态,如果有新的请求进来,EagerThreadPool 会选择先创建线程,而不是将其放入任务队列中,这样可以更快地响应这些请求。

Tomcat 实现也是与此类似的,只不过稍微有所不同,当 Tomcat 启动时,会先创建 minSpareThreads 个线程,如果经过一段时间收到请求时这些线程都处于忙碌状态,每次都会以 minSpareThreads 的步长创建线程,本质上也是为了更快地响应处理请求。具体的源码可以看它的 ThreadPool 实现,这里就不展开了。

2、我司网关 dubbo 调用线程池曾经出现过这样的一个问题:压测时接口可以正常返回,但接口 RT 很高,假设设置的核心线程大小为 500,最大线程为 800,缓冲队列为 5000,你能从这个设置中发现出一些问题并对这些参数进行调优吗?  

这个参数明显能看出问题来,首先任务队列设置过大,任务达到核心线程后,如果再有请求进来会先进入任务队列,队列满了之后才创建线程,创建线程也是需要不少开销的,所以我们后来把核心线程设置成了与最大线程一样,并且调用 prestartAllCoreThreads() 来预热核心线程,就不用等请求来时再创建线程了。

[](https://gitee.com/vip204888/java-p7)总结

---------------------------------------------------------------------

本文详细剖析了线程池的工作原理,相信大家对其工作机制应该有了较深入的了解,也对开头的几个问题有了较清楚的认识,本质上设置线程池的目的是为了利用有效的资源最大化性能,最小化风险,同时线程池的使用本质上是为了更好地为用户服务,据此也不难明白 Tomcat, Dubbo 要另起炉灶来设置自己的线程池了。

[](https://gitee.com/vip204888/java-p7)读者福利

-----------------------------------------------------------------------

感谢你看到了这里!  

我这边整理很多2020最新Java面试题(含答案)和Java学习笔记,如下图  

![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163101195664.jpg)

> 上述的面试题答案小编都整理成文档笔记。 同时也还整理了一些面试资料&最新2020收集的一些大厂的面试真题(都整理成文档,小部分截图)免费分享给大家,有需要的可以 [点击进入暗号:CSDN!](https://gitee.com/vip204888/java-p7)免费分享~

# 最后

更多Java进阶学习资料、2021大厂面试真题、视频资料可以**[点击这里获取到免费下载方式!](https://gitee.com/vip204888/java-p7)**

学习视频:

![](https://s2.51cto.com/images/20210828/1630163101689874.jpg)

大厂面试真题:

![](https://s2.51cto.com/images/20210828/1630163101803320.jpg)