文章目录
前言
《Java 线程池理解》简要梳理java线程池ThreadPoolExecutor的使用和主要流程,这篇在此基础上继续学习ThreadPoolExecutor的原理,顺带巩固AQS和CAS的知识。
ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
ThreadPoolExecutor主要参数
- corePoolSize 核心线程数量
- maximumPoolSize 最大线程数量
- unit 超时时间单位 ( TimeUnit.SECONDS 等)
- keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了
- workQueue 任务阻塞队列,用来缓存待执行的任务
- ThreadFactory 负责创建Thread线程
- RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略
ThreadPoolExecutor执行流程
AQS的使用
在分析ThreadPoolExecutor之前,先学习AbstractQueueSynchronizer,简称AQS,他是实现ReentrantLock、CountDownLatch的重要部分,ThreadPoolExecutor 也使用了AQS实现非重入锁机制。可以参考《ReentrantLock原理从开始到放弃》
简要说一下AQS的实现原理以及优势
AQS 基本数据结构是一个FIFO的双向队列,每个结点Node存储线程和其他信息;
队列的头部结点表示:该结点对应的线程已经处于执行状态,占用了资源;剩下的队列里的线程则被挂起等待唤醒。
AQS实现了一个FIFO的队列,让线程根据规则排列阻塞和争夺资源。
AQS提供了独占方式和共享方式,对应独占锁和共享锁。
AQS提供模板方法需要具体子类实现功能
- isHeldExclusively():该线程是否正在独占资源
- tryAcquire(int):独占方式下尝试去获取资源
- tryRelease(int):独占方式下尝试释放资源
- tryAcquireShared(int):共享方式下,尝试获取资源。返回值表示剩余资源,负数表示失败
- tryReleaseShared(int):共享方式
以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法。
支持独占锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively
支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。
AQS获取、修改state状态
- getState() 获取状态(资源)
- setState() 设置状态(资源)
- compareAndSetState() CAS设置状态(资源)
state可以在不同的场景下表达不同的意义,例如在ReentrantLock表示重入次数,CountDownLatch表示当前计数。
以ReentrantLock为例子:
这里只抽取了ReentrantLock 的公平锁部分核心代码看下:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// 线程调用lock
acquire(1); // 调用AQS的acquire()内部会顺序获取
}
// 具体的尝试获取锁方法,state
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前状态
// 当且state为0时候才可以表示获取锁
if (c == 0) {
// hasQueuedPredecessors 表示是否有其他线程排队,
// 如果没人排队就不必入队,直接CAS尝试去获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 获取锁成功,设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
线程池状态
线程池每一时刻都会处于以下一种状态。
RUNNING: 可接收新的任务,当前有任务正在排队、执行
SHUTDOWN: 不接收新的任务,当前有任务正在排队、执行
STOP: 不接收新的任务,不允许任务执行、排队,中断正在执行的任务
TIDYING: 所有任务都被终止,无执行任务线程,会执行terminated()
TERMINATED: terminated() 方法执行完成
// 线程池状态 + 数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
ctl有32位bit,其中前3位表示线程状态,后面39位表示线程数量,所以线程数最大为2的39次方-1。
提供几个方法:
// 获取线程池状态
private static int runStateOf(int c) {
return c & ~CAPACITY; }
// 获取线程数
private static int workerCountOf(int c) {
return c & CAPACITY; }
// 利用位运算集合线程池状态和数量,计算ctl
private static int ctlOf(int rs, int wc) {
return rs | wc; }
worker
利用worker实现非可重入锁
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// 执行的第一个任务,后续任务从blockQueue获取
Worker(Runnable firstTask) {
// 设置为-1,不允许被线程获取锁,后续到runWorker()内部会解除这个状态
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 由定义的ThreadFactory创建线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
// 后续详细分析
runWorker(this);
}
// 0表示无锁,非0表示锁占用状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试获取资源
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放资源
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 直接acquire,实现非重入锁,对比下ReentrantLock
public void lock() {
acquire(1); }
public boolean tryLock() {
return tryAcquire(1); }
public void unlock() {
release(1); }
public boolean isLocked() {
return isHeldExclusively(); }
}
对比下ReentrantLock和Worker的tryAcquire(),Worker中并没有允许同个线程多次获取锁,也没有做过多优化。
runWorker()
再看下runWorker()是我们创建的线程死循环执行取任务、执行任务的过程,简单的看下具体流程。
final void runWorker(Worker w) {
// 运行在创建的子线程中
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁状态,对应上面 setState(-1);
w.unlock(); // allow interrupts
// 表示任务执行是否被中断了
boolean completedAbruptly = true;
try {
// getTask()会从blockQueue取任务,控制线程数量等操作
while (task != null || (task = getTask()) != null) {
// 锁住worker
w.lock();
// 这部分是跟线程池状态有关的判断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前,留给子类的空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后,留给子类的空实现
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任务数+1
w.completedTasks++;
w.unlock();
}
}
// 未被中断
completedAbruptly = false;
} finally {
// 线程结束操作
processWorkerExit(w, completedAbruptly);
}
}
getTask() 获取任务(返回null代表结束当前线程)
getTask()负责取出要执行的任务,如果返回null就会走到线程结束的操作
private Runnable getTask() {
boolean timedOut = false; //超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检测线程池状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 当前线程数
int wc = workerCountOf(c);
// 若阻塞超时的话,是否允许回收掉当前线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前线程数超过最大线程数,或者,超时了且允许被回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 从阻塞队列取任务,或者队列为空被阻塞
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; //获取到任务
timedOut = true;// 超时
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask()内部是一个for语句循环,
- 阻塞队列不为空,能取到任务直接返回task;
- 阻塞队列为空,阻塞一段时间超时,如果允许核心线程超时被回收,或者当前线程数 > 核心线程数 或者收,则返回null,让当前线程被结束掉;
- 走到这里的留下的核心线程,只能在死循环 + 阻塞队列阻塞等待任务的到来。
processWorkerExit() 结束线程
processWorkerExit()执行线程结束操作,处理了控制线程数、回收线程资源的操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 线程被中断了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 将work数减一
decrementWorkerCount();
// 因为要操作全局变量,利用ReentrantLock锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 总的任务完成数
completedTaskCount += w.completedTasks;
// 移除当前worker
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 如果在没有停止状态
if (runStateLessThan(c, STOP)) {
// 不是因为异常中断
if (!completedAbruptly) {
// allowCoreThreadTimeOut表示是否允许核心线程超时回收,如果允许就可以不添加worker
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// worker数量超过core thread数量
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 若因为异常中断,或者当前线程数<coreThread数量,添加一个worker线程
addWorker(null, false);
}
}
addWorker() 内部主要做两个操作:
- 校验当前线程池状态
- 添加一个worker线程
private boolean addWorker(Runnable firstTask, boolean core) {
w = new Worker(firstTask);
final Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
workers.add(w);
mainLock.unlock();
t.start();
}
到达这里,我们知道Worker是利用AQS实现一个非可重入锁,但是为什么需要一个非可重入锁呢?
每一个Worker在各自的子线程中运行,取任务前lock,在任务结束之后unlock,一旦获取了独占锁,表示当前线程正在执行任务中。
可是也并没有其他线程争夺Worker,争夺的应该是task,而task利用阻塞队列实现多线程竞争了。
不是很理解为什么需要非可重入锁?
后面发现,使用独占锁来表示线程是否正在执行任务,Worker的线程获取了独占锁就说明它在执行任务,不能被中断。
execute流程
execute流程分成三步:
- 如果当前worker线程少于核心线程数,则addWorker()添加一个线程执行。
- 如果worker线程数等于核心线程数,则将任务加入到阻塞队列workQueue
- 阻塞队列满了且未等到最大线程数之前,增加一个worker线程
- 否则,执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
总结
-
线程池会分核心线程和非核心线程吗?
不会,ThreadPoolThread是控制线程数来保证核心线程数量
-
线程池在哪里控制数量
利用阻塞队列阻塞,若超时了并且允许回收,则当前线程会被结束,若不允许结束,则当前线程会死循环并且一段时间阻塞的等待任务。
-
worker为什么需要不可重入锁
Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断
参考https://www.cnblogs.com/liuzhihu/p/8177371.html
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/19391.html