Java ThreadPoolExecutor线程池原理详解编程语言

前言

《Java 线程池理解》简要梳理java线程池ThreadPoolExecutor的使用和主要流程,这篇在此基础上继续学习ThreadPoolExecutor的原理,顺带巩固AQS和CAS的知识。

ThreadPoolExecutor构造函数

public ThreadPoolExecutor(int corePoolSize,  
    int maximumPoolSize,  
    long keepAliveTime, 
    TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, 
    ThreadFactory threadFactory,  
    RejectedExecutionHandler handler); 

ThreadPoolExecutor主要参数

  1. corePoolSize 核心线程数量
  2. maximumPoolSize 最大线程数量
  3. unit 超时时间单位 ( TimeUnit.SECONDS 等)
  4. keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了
  5. workQueue 任务阻塞队列,用来缓存待执行的任务
  6. ThreadFactory 负责创建Thread线程
  7. RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略

ThreadPoolExecutor执行流程

执行流程

AQS的使用

在分析ThreadPoolExecutor之前,先学习AbstractQueueSynchronizer,简称AQS,他是实现ReentrantLock、CountDownLatch的重要部分,ThreadPoolExecutor 也使用了AQS实现非重入锁机制。可以参考《ReentrantLock原理从开始到放弃》

简要说一下AQS的实现原理以及优势

AQS 基本数据结构是一个FIFO的双向队列,每个结点Node存储线程和其他信息;

队列的头部结点表示:该结点对应的线程已经处于执行状态,占用了资源;剩下的队列里的线程则被挂起等待唤醒。
AQS
AQS实现了一个FIFO的队列,让线程根据规则排列阻塞和争夺资源。

AQS提供了独占方式和共享方式,对应独占锁和共享锁。

AQS提供模板方法需要具体子类实现功能

  • isHeldExclusively():该线程是否正在独占资源
  • tryAcquire(int):独占方式下尝试去获取资源
  • tryRelease(int):独占方式下尝试释放资源
  • tryAcquireShared(int):共享方式下,尝试获取资源。返回值表示剩余资源,负数表示失败
  • tryReleaseShared(int):共享方式

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法。

支持独占锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively

支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared、isHeldExclusively。

AQS获取、修改state状态

  • getState() 获取状态(资源)
  • setState() 设置状态(资源)
  • compareAndSetState() CAS设置状态(资源)

state可以在不同的场景下表达不同的意义,例如在ReentrantLock表示重入次数,CountDownLatch表示当前计数。

以ReentrantLock为例子:

ReentrantLock AQS
这里只抽取了ReentrantLock 的公平锁部分核心代码看下:

 static final class FairSync extends Sync {
    
        private static final long serialVersionUID = -3000897897090466540L; 
        final void lock() {
    // 线程调用lock 
            acquire(1); // 调用AQS的acquire()内部会顺序获取 
        } 
        // 具体的尝试获取锁方法,state 
        protected final boolean tryAcquire(int acquires) {
    
            final Thread current = Thread.currentThread(); 
            int c = getState(); // 获取当前状态 
            // 当且state为0时候才可以表示获取锁 
            if (c == 0) {
    
            // hasQueuedPredecessors 表示是否有其他线程排队, 
            // 如果没人排队就不必入队,直接CAS尝试去获取锁 
                if (!hasQueuedPredecessors() && 
                    compareAndSetState(0, acquires)) {
    
                    // 获取锁成功,设置当前线程独占 
                    setExclusiveOwnerThread(current); 
                    return true; 
                } 
            } 
            // 当前线程可重入 
            else if (current == getExclusiveOwnerThread()) {
    
                int nextc = c + acquires; 
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded"); 
                setState(nextc); 
                return true; 
            } 
            return false; 
        } 
    } 

线程池状态

线程池每一时刻都会处于以下一种状态。

RUNNING: 可接收新的任务,当前有任务正在排队、执行
SHUTDOWN: 不接收新的任务,当前有任务正在排队、执行
STOP: 不接收新的任务,不允许任务执行、排队,中断正在执行的任务
TIDYING: 所有任务都被终止,无执行任务线程,会执行terminated()
TERMINATED: terminated() 方法执行完成

// 线程池状态 + 数量 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
 
private static final int COUNT_BITS = Integer.SIZE - 3; 
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 
 
// runState is stored in the high-order bits 
private static final int RUNNING    = -1 << COUNT_BITS; 
private static final int SHUTDOWN   =  0 << COUNT_BITS; 
private static final int STOP       =  1 << COUNT_BITS; 
private static final int TIDYING    =  2 << COUNT_BITS; 
private static final int TERMINATED =  3 << COUNT_BITS; 

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

ctl有32位bit,其中前3位表示线程状态,后面39位表示线程数量,所以线程数最大为2的39次方-1。

提供几个方法:

// 获取线程池状态 
private static int runStateOf(int c)     {
    return c & ~CAPACITY; } 
// 获取线程数 
private static int workerCountOf(int c)  {
    return c & CAPACITY; } 
// 利用位运算集合线程池状态和数量,计算ctl 
private static int ctlOf(int rs, int wc) {
    return rs | wc; } 

worker

利用worker实现非可重入锁

private final class Worker extends AbstractQueuedSynchronizer implements Runnable  
/** Thread this worker is running in.  Null if factory fails. */ 
final Thread thread; 
/** Initial task to run.  Possibly null. */ 
Runnable firstTask; 
/** Per-thread task counter */ 
volatile long completedTasks; 
// 执行的第一个任务,后续任务从blockQueue获取 
Worker(Runnable firstTask) {
 
// 设置为-1,不允许被线程获取锁,后续到runWorker()内部会解除这个状态 
setState(-1); // inhibit interrupts until runWorker 
this.firstTask = firstTask; 
// 由定义的ThreadFactory创建线程 
this.thread = getThreadFactory().newThread(this); 
} 
/** Delegates main run loop to outer runWorker. */ 
public void run() {
 
// 后续详细分析 
runWorker(this); 
} 
// 0表示无锁,非0表示锁占用状态 
protected boolean isHeldExclusively() {
 
return getState() != 0; 
} 
// 尝试获取资源 
protected boolean tryAcquire(int unused) {
 
if (compareAndSetState(0, 1)) {
 
setExclusiveOwnerThread(Thread.currentThread()); 
return true; 
} 
return false; 
} 
// 尝试释放资源 
protected boolean tryRelease(int unused) {
 
setExclusiveOwnerThread(null); 
setState(0); 
return true; 
} 
// 直接acquire,实现非重入锁,对比下ReentrantLock 
public void lock()        {
 acquire(1); } 
public boolean tryLock()  {
 return tryAcquire(1); } 
public void unlock()      {
 release(1); } 
public boolean isLocked() {
 return isHeldExclusively(); } 
} 

对比下ReentrantLock和Worker的tryAcquire(),Worker中并没有允许同个线程多次获取锁,也没有做过多优化。

runWorker()

再看下runWorker()是我们创建的线程死循环执行取任务、执行任务的过程,简单的看下具体流程。

final void runWorker(Worker w) {
 
// 运行在创建的子线程中 
Thread wt = Thread.currentThread(); 
Runnable task = w.firstTask; 
w.firstTask = null; 
// 解锁状态,对应上面 setState(-1);  
w.unlock(); // allow interrupts 
// 表示任务执行是否被中断了 
boolean completedAbruptly = true; 
try {
 
// getTask()会从blockQueue取任务,控制线程数量等操作 
while (task != null || (task = getTask()) != null) {
 
// 锁住worker 
w.lock(); 
// 这部分是跟线程池状态有关的判断 
if ((runStateAtLeast(ctl.get(), STOP) || 
(Thread.interrupted() && 
runStateAtLeast(ctl.get(), STOP))) && 
!wt.isInterrupted()) 
wt.interrupt(); 
try {
 
// 执行任务前,留给子类的空实现 
beforeExecute(wt, task); 
Throwable thrown = null; 
try {
 
// 执行任务 
task.run(); 
} catch (RuntimeException x) {
 
thrown = x; throw x; 
} catch (Error x) {
 
thrown = x; throw x; 
} catch (Throwable x) {
 
thrown = x; throw new Error(x); 
} finally {
 
// 执行任务后,留给子类的空实现 
afterExecute(task, thrown); 
} 
} finally {
 
task = null; 
// 完成任务数+1 
w.completedTasks++; 
w.unlock(); 
} 
} 
// 未被中断 
completedAbruptly = false; 
} finally {
 
// 线程结束操作 
processWorkerExit(w, completedAbruptly); 
} 
} 

getTask() 获取任务(返回null代表结束当前线程)

getTask()负责取出要执行的任务,如果返回null就会走到线程结束的操作

private Runnable getTask() {
 
boolean timedOut = false; //超时 
for (;;) {
 
int c = ctl.get(); 
int rs = runStateOf(c); 
// 检测线程池状态 
// Check if queue empty only if necessary. 
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
 
decrementWorkerCount(); 
return null; 
} 
// 当前线程数 
int wc = workerCountOf(c); 
// 若阻塞超时的话,是否允许回收掉当前线程 
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 
// 当前线程数超过最大线程数,或者,超时了且允许被回收 
if ((wc > maximumPoolSize || (timed && timedOut)) 
&& (wc > 1 || workQueue.isEmpty())) {
 
if (compareAndDecrementWorkerCount(c)) 
return null; 
continue; 
} 
// 从阻塞队列取任务,或者队列为空被阻塞 
try {
 
Runnable r = timed ? 
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
workQueue.take(); 
if (r != null) 
return r; //获取到任务 
timedOut = true;// 超时 
} catch (InterruptedException retry) {
 
timedOut = false; 
} 
} 
} 

getTask()内部是一个for语句循环,

  • 阻塞队列不为空,能取到任务直接返回task;
  • 阻塞队列为空,阻塞一段时间超时,如果允许核心线程超时被回收,或者当前线程数 > 核心线程数 或者收,则返回null,让当前线程被结束掉;
  • 走到这里的留下的核心线程,只能在死循环 + 阻塞队列阻塞等待任务的到来。

processWorkerExit() 结束线程

processWorkerExit()执行线程结束操作,处理了控制线程数、回收线程资源的操作

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
 
// 线程被中断了 
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 
// 将work数减一 
decrementWorkerCount(); 
// 因为要操作全局变量,利用ReentrantLock锁 
final ReentrantLock mainLock = this.mainLock; 
mainLock.lock(); 
try {
 
// 总的任务完成数 
completedTaskCount += w.completedTasks; 
// 移除当前worker 
workers.remove(w); 
} finally {
 
mainLock.unlock(); 
} 
tryTerminate(); 
int c = ctl.get(); 
// 如果在没有停止状态 
if (runStateLessThan(c, STOP)) {
 
// 不是因为异常中断 
if (!completedAbruptly) {
  
// allowCoreThreadTimeOut表示是否允许核心线程超时回收,如果允许就可以不添加worker 
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 
if (min == 0 && ! workQueue.isEmpty()) 
min = 1; 
// worker数量超过core thread数量 
if (workerCountOf(c) >= min) 
return; // replacement not needed 
} 
// 若因为异常中断,或者当前线程数<coreThread数量,添加一个worker线程 
addWorker(null, false); 
} 
} 

addWorker() 内部主要做两个操作:

  1. 校验当前线程池状态
  2. 添加一个worker线程
private boolean addWorker(Runnable firstTask, boolean core) {
 
w = new Worker(firstTask); 
final Thread t = w.thread; 
final ReentrantLock mainLock = this.mainLock; 
mainLock.lock(); 
workers.add(w); 
mainLock.unlock(); 
t.start(); 
} 

到达这里,我们知道Worker是利用AQS实现一个非可重入锁,但是为什么需要一个非可重入锁呢?

每一个Worker在各自的子线程中运行,取任务前lock,在任务结束之后unlock,一旦获取了独占锁,表示当前线程正在执行任务中。

可是也并没有其他线程争夺Worker,争夺的应该是task,而task利用阻塞队列实现多线程竞争了。

不是很理解为什么需要非可重入锁?

后面发现,使用独占锁来表示线程是否正在执行任务,Worker的线程获取了独占锁就说明它在执行任务,不能被中断。

execute流程

execute流程分成三步:

  1. 如果当前worker线程少于核心线程数,则addWorker()添加一个线程执行。
  2. 如果worker线程数等于核心线程数,则将任务加入到阻塞队列workQueue
  3. 阻塞队列满了且未等到最大线程数之前,增加一个worker线程
  4. 否则,执行拒绝策略
public void execute(Runnable command) {
 
if (command == null) 
throw new NullPointerException(); 
int c = ctl.get(); 
if (workerCountOf(c) < corePoolSize) {
 
if (addWorker(command, true)) 
return; 
c = ctl.get(); 
} 
if (isRunning(c) && workQueue.offer(command)) {
 
int recheck = ctl.get(); 
if (! isRunning(recheck) && remove(command)) 
reject(command); 
else if (workerCountOf(recheck) == 0) 
addWorker(null, false); 
} 
else if (!addWorker(command, false)) 
reject(command); 
} 

总结

  • 线程池会分核心线程和非核心线程吗?

    不会,ThreadPoolThread是控制线程数来保证核心线程数量

  • 线程池在哪里控制数量

    利用阻塞队列阻塞,若超时了并且允许回收,则当前线程会被结束,若不允许结束,则当前线程会死循环并且一段时间阻塞的等待任务。

  • worker为什么需要不可重入锁

    Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断

    参考https://www.cnblogs.com/liuzhihu/p/8177371.html

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/19391.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论