// 结果: 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 结果: 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 结果: 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 结果: 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 结果: 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
}
可以看到,ctl 是一个 原子类的 Integer 变量,有 32 位,低 29 位表示线程数量, 29 位最大可以表示 (2^29)-1 (大概 5 亿多),足够记录线程大小了,如果未来还是不够,可以把 ctl 声明为 AtomicLong,高 3 位用来表示线程池的状态,3 位可以表示 8 个线程池的状态,由于线程池总共只有五个状态,所以 3 位也是足够了,线程池的五个状态如下
* RUNNING: 接收新的任务,并能继续处理 workQueue 中的任务
* SHUTDOWN: 不再接收新的任务,不过能继续处理 workQueue 中的任务
* STOP: 不再接收新的任务,也不再处理 workQueue 中的任务,并且会中断正在处理任务的线程
* TIDYING: 所有的任务都完结了,并且线程数量(workCount)为 0 时即为此状态,进入此状态后会调用 terminated() 这个钩子方法进入 TERMINATED 状态
* TERMINATED: 调用 terminated() 方法后即为此状态
线程池的状态流转及触发条件如下
![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163099185952.jpg)
有了这些基础,我们来分析下 execute 的源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程数少于核心线程数(corePoolSize),无论核心线程是否忙碌,都创建线程,直到达到 corePoolSize 为止
if (workerCountOf(c) < corePoolSize) {
// 创建线程并将此任务交给 worker 处理(此时此任务即 worker 中的 firstTask)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于 RUNNING 状态,并且线程数大于 corePoolSize 或者
// 线程数少于 corePoolSize 但创建线程失败了,则将任务丢进 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 这里需要再次检查线程池是否处于 RUNNING 状态,因为在任务入队后可能线程池状态会发生变化,(比如调用了 shutdown 方法等),如果线程状态发生变化了,则移除此任务,执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池在 RUNNING 状态下,线程数为 0,则新建线程加速处理 workQueue 中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 这段逻辑说明线程数大于 corePoolSize 且任务入队失败了,此时会以最大线程数(maximumPoolSize)为界来创建线程,如果失败,说明线程数超过了 maximumPoolSize,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
从这段代码中可以看到,创建线程是调用 addWorker 实现的,在分析 addWorker 之前,有必要简单提一下 Worker,线程池把每一个执行任务的线程都封装为 Worker 的形式,取名为 Worker 很形象,线程池的本质是生产者-消费者模型,生产者不断地往 workQueue 中丢 task, workQueue 就像流水线一样不断地输送着任务,而 worker(工人) 不断地取任务来执行
![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163099484774.jpg)
那么问题来了,为啥要把线程封装到 worker 中呢,线程池拿到 task 后直接丢给线程处理或者让线程自己去 workQueue 中处理不就完了?
将线程封装为 worker 主要是为了更好地管理线程的中断
来看下 Worker 的定义
// 此处可以看出 worker 既是一个 Runnable 任务,也实现了 AQS(实际上是用 AQS 实现了一个独占锁,这样由于 worker 运行时会上锁,执行 shutdown,setCorePoolSize,setMaximumPoolSize等方法时会试着中断线程(interruptIdleWorkers) ,在这个方法中断方法中会先尝试获取 worker 的锁,如果不成功,说明 worker 在运行中,此时会先让 worker 执行完任务再关闭 worker 的线程,实现优雅关闭线程的目的)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 实际执行任务的线程
final Thread thread;
// 上文提到,如果当前线程数少于核心线程数,创建线程并将提交的任务交给 worker 处理处理,此时 firstTask 即为此提交的任务,如果 worker 从 workQueue 中获取任务,则 firstTask 为空
Runnable firstTask;
// 统计完成的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
// 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0
setState(-1);
this.firstTask = firstTask;
// 根据线程池的 threadFactory 创建一个线程,将 worker 本身传给线程(因为 worker 实现了 Runnable 接口)
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// thread 启动后会调用此方法
runWorker(this);
}
// 1 代表被锁住了,0 代表未锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取锁
protected boolean tryAcquire(int unused) {
// 从这里可以看出它是一个独占锁,因为当获取锁后,cas 设置 state 不可能成功,这里我们也能明白上文中将 state 设置为 -1 的作用,这种情况下永远不可能获取得锁,而 worker 要被中断首先必须获取锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断线程,这个方法会被 shutdowNow 调用,从中可以看出 shutdownNow 要中断线程不需要获取锁,也就是说如果线程正在运行,照样会给你中断掉,所以一般来说我们不用 shutdowNow 来中断线程,太粗暴了,中断时线程很可能在执行任务,影响任务执行
void interruptIfStarted() {
Thread t;
// 中断也是有条件的,必须是 state >= 0 且 t != null 且线程未被中断
// 如果 state == -1 ,不执行中断,再次明白了为啥上文中 setState(-1) 的意义
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
通过上文对 Worker 类的分析,相信大家不难理解 将线程封装为 worker 主要是为了更好地管理线程的中断 这句话。
理解了 Worker 的意义,我们再来看 addWorker 的方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池的状态
int rs = runStateOf(c);
// 如果线程池的状态 >= SHUTDOWN,即为 SHUTDOWN,STOP,TIDYING,TERMINATED 这四个状态,只有一种情况有可能创建线程,即线程状态为 SHUTDOWN, 且队列非空时,firstTask == null 代表创建一个不接收新任务的线程(此线程会从 workQueue 中获取任务再执行),这种情况下创建线程是为了加速处理完 workQueue 中的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果超过了线程池的最大 CAPACITY(5 亿多,基本不可能)
// 或者 超过了 corePoolSize(core 为 true) 或者 maximumPoolSize(core 为 false) 时
// 则返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 否则 CAS 增加线程的数量,如果成功跳出双重循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程运行状态发生变化,跳到外层循环继续执行
if (runStateOf(c) != rs)
continue retry;
// 说明是因为 CAS 增加线程数量失败所致,继续执行 retry 的内层循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 能执行到这里,说明满足增加 worker 的条件了,所以创建 worker,准备添加进线程池中执行任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁,是因为下文要把 w 添加进 workers 中, workers 是 HashSet,不是线程安全的,所以需要加锁予以保证
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次 check 线程池的状态以防执行到此步时发生中断等
int rs = runStateOf(ctl.get());
// 如果线程池状态小于 SHUTDOWN(即为 RUNNING),
// 或者状态为 SHUTDOWN 但 firstTask == null(代表不接收任务,只是创建线程处理 workQueue 中的任务),则满足添加 worker 的条件
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已启动,显然有问题(因为创建 worker 后,还没启动线程呢),抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 记录最大的线程池大小以作监控之用
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 说明往 workers 中添加 worker 成功,此时启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 添加线程失败,执行 addWorkerFailed 方法,主要做了将 worker 从 workers 中移除,减少线程数,并尝试着关闭线程池这样的操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
从这段代码我们可以看到多线程下情况的不可预料性,我们发现在满足条件情况下,又对线程状态重新进行了 check,以防期间出现中断等线程池状态发生变更的操作,这也给我们以启发:多线程环境下的各种临界条件一定要考虑到位。
执行 addWorker 创建 worker 成功后,线程开始执行了(t.start()),由于在创建 Worker 时,将 Worker 自己传给了此线程,所以启动线程后,会调用 Worker 的 run 方法
public void run() {
runWorker(this);
}
可以看到最终会调用 runWorker 方法,接下来我们来分析下 runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// unlock 会调用 tryRelease 方法将 state 设置成 0,代表允许中断,允许中断的条件上文我们在 interruptIfStarted() 中有提过,即 state >= 0
w.unlock();
boolean completedAbruptly = true;
try {
// 如果在提交任务时创建了线程,并把任务丢给此线程,则会先执行此 task
// 否则从任务队列中获取 task 来执行(即 getTask() 方法)
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态为 >= STOP(即 STOP,TIDYING,TERMINATED )时,则线程应该中断
// 如果线程池状态 < STOP, 线程不应该中断,如果中断了(Thread.interrupted() 返回 true,并清除标志位),再次判断线程池状态(防止在清除标志位时执行了 shutdownNow() 这样的方法),如果此时线程池为 STOP,执行线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前,子类可实现此钩子方法作为统计之用
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后,子类可实现此钩子方法作为统计之用
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果执行到这只有两种可能,一种是执行过程中异常中断了,一种是队列里没有任务了,从这里可以看出线程没有核心线程与非核心线程之分,哪个任务异常了或者正常退出了都会执行此方法,此方法会根据情况将线程数-1
processWorkerExit(w, completedAbruptly);
}
}
来看看 processWorkerExit 方法是咋样的
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果异常退出,cas 执行线程池减 1 操作
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 加锁确保线程安全地移除 worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// woker 既然异常退出,可能线程池状态变了(如执行 shutdown 等),尝试着关闭线程池
tryTerminate();
int c = ctl.get();
// 如果线程池处于 STOP 状态,则如果 woker 是异常退出的,重新新增一个 woker,如果是正常退出的,在 wokerQueue 为非空的条件下,确保至少有一个线程在运行以执行 wokerQueue 中的任务
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
接下来我们分析 woker 从 workQueue 中取任务的方法 getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态至少为 STOP 或者
// 线程池状态 == SHUTDOWN 并且任务队列是空的
// 则减少线程数量,返回 null,这种情况下上文分析的 runWorker 会执行 processWorkerExit 从而让获取此 Task 的 woker 退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 如果 allowCoreThreadTimeOut 为 true,代表任何线程在 keepAliveTime 时间内处于 idle 状态都会被回收,如果线程数大于 corePoolSize,本身在 keepAliveTime 时间内处于 idle 状态就会被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// worker 应该被回收的几个条件,这个比较简单,就此略过
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 阻塞获取 task,如果在 keepAliveTime 时间内未获取任务,说明超时了,此时 timedOut 为 true
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
经过以上源码剖析,相信我们对线程池的工作原理了解得八九不离十了,再来简单过一下其他一些比较有用的方法,开头我们提到线程池的监控问题,我们看一下可以监控哪些指标
* int getCorePoolSize():获取核心线程数。
* int getLargestPoolSize():历史峰值线程数。
* int getMaximumPoolSize():最大线程数(线程池线程容量)。
* int getActiveCount():当前活跃线程数
* int getPoolSize():当前线程池中的线程总数
* BlockingQueue getQueue() 当前线程池的任务队列,据此可以获取积压任务的总数,getQueue.size()
监控思路也很简单,开启一个定时线程 ScheduledThreadPoolExecutor,定期对这些线程池指标进行采集,一般会采用一些开源工具如 Grafana + Prometheus + MicroMeter 来实现。
> 如何实现核心线程池的预热
使用 prestartAllCoreThreads() 方法,这个方法会一次性创建 corePoolSize 个线程,无需等到提
> 如何实现动态调整线程池参数
* setCorePoolSize(int corePoolSize) 调整核心线程池大小
* setMaximumPoolSize(int maximumPoolSize)
* setKeepAliveTime() 设置线程的存活时间
[](https://gitee.com/vip204888/java-p7)解答开篇的问题
--------------------------------------------------------------------------
1、Tomcat 的线程池和 JDK 的线程池实现有啥区别, Dubbo 中有类似 Tomcat 的线程池实现吗? Dubbo 中一个叫 EagerThreadPool 的东西,可以看看它的使用说明
![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163100421327.jpg)
从注释里可以看出,如果核心线程都处于 busy 状态,如果有新的请求进来,EagerThreadPool 会选择先创建线程,而不是将其放入任务队列中,这样可以更快地响应这些请求。
Tomcat 实现也是与此类似的,只不过稍微有所不同,当 Tomcat 启动时,会先创建 minSpareThreads 个线程,如果经过一段时间收到请求时这些线程都处于忙碌状态,每次都会以 minSpareThreads 的步长创建线程,本质上也是为了更快地响应处理请求。具体的源码可以看它的 ThreadPool 实现,这里就不展开了。
2、我司网关 dubbo 调用线程池曾经出现过这样的一个问题:压测时接口可以正常返回,但接口 RT 很高,假设设置的核心线程大小为 500,最大线程为 800,缓冲队列为 5000,你能从这个设置中发现出一些问题并对这些参数进行调优吗?
这个参数明显能看出问题来,首先任务队列设置过大,任务达到核心线程后,如果再有请求进来会先进入任务队列,队列满了之后才创建线程,创建线程也是需要不少开销的,所以我们后来把核心线程设置成了与最大线程一样,并且调用 prestartAllCoreThreads() 来预热核心线程,就不用等请求来时再创建线程了。
[](https://gitee.com/vip204888/java-p7)总结
---------------------------------------------------------------------
本文详细剖析了线程池的工作原理,相信大家对其工作机制应该有了较深入的了解,也对开头的几个问题有了较清楚的认识,本质上设置线程池的目的是为了利用有效的资源最大化性能,最小化风险,同时线程池的使用本质上是为了更好地为用户服务,据此也不难明白 Tomcat, Dubbo 要另起炉灶来设置自己的线程池了。
[](https://gitee.com/vip204888/java-p7)读者福利
-----------------------------------------------------------------------
感谢你看到了这里!
我这边整理很多2020最新Java面试题(含答案)和Java学习笔记,如下图
![在这里插入图片描述](https://s2.51cto.com/images/20210828/1630163101195664.jpg)
> 上述的面试题答案小编都整理成文档笔记。 同时也还整理了一些面试资料&最新2020收集的一些大厂的面试真题(都整理成文档,小部分截图)免费分享给大家,有需要的可以 [点击进入暗号:CSDN!](https://gitee.com/vip204888/java-p7)免费分享~
# 最后
更多Java进阶学习资料、2021大厂面试真题、视频资料可以**[点击这里获取到免费下载方式!](https://gitee.com/vip204888/java-p7)**
学习视频:
![](https://s2.51cto.com/images/20210828/1630163101689874.jpg)
大厂面试真题:
![](https://s2.51cto.com/images/20210828/1630163101803320.jpg)
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/122789.html