一、 六个重要参数的解释
1、 corePoolSzie
核心线程数,线程池的基本大小,在线程处于空闲状态也不会被清除的数量,除非设置了 allowCoreThreadTimeOut
,即使设置了这个属性,在并发情况下如果任务队列不为空,最后一个工作线程就算超时也会被保留下来去处理任务队列中的任务。在线程池初始化完成时,并不会立刻创建这么多线程,除非调用了 prestartCoreThread/prestartAllCoreThreads
来启动线程。
setCorePoolSize 方法
设置核心线程数。这会覆盖构造函数中设置的任何值。如果新值小于当前值,多余的现有线程将在下次空闲时终止。如果更大,并且任务队列不为空,则循环创建新的空闲线程去执行任务队列中的任务。
2、maximumPoolSize
最大线程数,当任务数量大于核心线程数 + 任务阻塞队列大小,并且最大线程数大于核心线程数时,就会创建新的线程去处理任务。线程池的线程数不会超过这个数值。
setMaximumPoolSize 方法
设置最大线程数。设置成功之后如果当前最大线程数大于之前的最大线程数,则会中断所有在空闲状态的工作线程。
3、keepAliveTime
空闲线程活跃时间,当工作线程数超过核心线程数时,工作线程空闲时间超过线程活跃时长时,线程会被终止。如果调用了 allowCoreThreadTimeOut
方法,所有线程都会收到 keepAliveTime
的影响。
setKeepAliveTime 方法
设置空闲线程活跃时长。如果设置的值小于之前的值,会立刻中断现在所有空闲的线程。
4、workQueue
任务队列,当工作线程数到达核心线程数限制时,接收的任务会先进入任务队列,当线程重新空闲时会去任务队列重新获取任务去执行。当任务队列满了之后,回去创建新的工作线程去执行任务,如果工作线程数到达最大线程数限制,会执行任务的拒绝策略。
常见任务队列:
SynchronousQueue
不接收任务,直接创建新线程去处理任务。在处理线程之间有依赖的情况,可以避免线程被锁定(例如:如果线程池核心线程数为 1,有一个 A 线程执行到一半需要 B 线程执行完才能继续,而 B 线程在任务队列中不能被执行,造成死锁)。通常需要设置 maximumPoolSize
为Integer.MAX_VALUE
来配合,但是当任务接收速度大于处理速度时,会造成工作线程一直增长,直到 OOM
。
无界 LinkedBlockingQueue
当每个任务独立于其他任务时很适合使用,可以处理突发的任务数暴增情况,但是当处理速度持续小于任务接收速度时,也会造成 OOM
。
有界 ArrayBlockingQueue
通常使用的都是有界队列,指定合理的队列长度来防止 OOM
。
5、threadFactory
线程工厂,用于生产工作线程。可以用于指定线程的名字,线程组,优先级和守护进程状态等等。如果新建线程返回为 null
,程序会正常进行,但是可能不会执行任何任务。
setThreadFactory 方法
动态设置线程工厂。
6、handler
拒绝策略,当任务队列已满,工作线程数到达最大线程数上限,就会执行拒绝策略。
四个预置拒绝策略:
ThreadPoolExecutor.AbortPolicy
快速失败,超过线程池任务承载数还提交任务时直接报错。
ThreadPoolExecutor.CallerRunsPolicy
利用提交线程的主线程去执行任务来减缓任务提交的速率。
ThreadPoolExecutor.DiscardPolicy
直接丢弃
ThreadPoolExecutor.DiscardOldestPolicy
丢弃最早的一个任务
7、此外
Hook methods
线程池提供了 beforeExecute(Thread, Runnable)
和 afterExecute(Runnable, Throwable)
两个方法在任务执行前后去处理任务
Queue maintenance
getQueue()
获取任务队列,用于对任务队列情况的监控。remove(Runnable)
用于移除任务。purge()
用于移除任务队列中的所有任务。
二、线程池的生命周期
graph LR
RUNNING(RUNNING) — shutdown –> SHUTDOWN(SHUTDOWN)
RUNNING — shutdownNow –> STOP(STOP)
SHUTDOWN — 任务队列和工作线程为空 –> TIDYING(TIDYING)
STOP — 工作线程数为零 –> TIDYING
TIDYING — termainated –> TERMINATED(TERMINATED)
三、线程池常见方法解析
1、execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 三步:
// 1.工作线程(worker)数小于核心线程就直接创建
// 2.大于就加入任务队列排队
// 3.排队失败(满了)就再尝试创建 worker, 如果创建失败(超过最大线程数)就执行拒绝策略
int c = ctl.get();
// 判断当前工作线程是否少于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建 worker, true 代表为是否创建核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 工作线程数已达到核心线程数上限,
// 所以任务需要进入任务队列去排队(就算有空闲核心线程也需要去排队)
// 只有线程池在 RUNNING 状态才能入队
if (isRunning(c) && workQueue.offer(command)) {
// 重新检查
// 1. 检查线程池是否还处于 RUNNING 状态
// 2. 检查是否还有工作线程(因为核心线程也可以有 aliveTimeOut, 如果
// 工作线程为 0, 那么最坏的情况是这个任务永远都不会被执行,
// 所以需要添加一个空闲工作线程去获取任务)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 非 RUNNING 状态拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)
// 添加空闲工作线程
addWorker(null, false);
}
// 任务队列已满 或 线程池为非 RUNNING 状态
else if (!addWorker(command, false))
// worker 添加失败拒绝任务
reject(command);
}
2、addWorker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
// 失败的情况:
// 1. 线程池状态至少为 STOP
// 2. 线程池状态至少为 SHUTDOWN,但是任务队列为空
// 3. 线程池处于 SHUTDOWN,但是传入了新的任务
// 4. 工作线程数已超过最大线程数
// 5. 线程工厂生产出的线程处于非活跃状态
// 成功前提(满足下面所有情况):
// 1. 工作线程数未超过最大线程数
// 2. 线程池处于 RUNNING 状态 或 线程池处于 SHUTDOWN 状态
// 但是没有传入新任务(即只是启动一个空闲工作线程去处理任务队列的任务)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 线程池控制参数
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判断是否允许新建工作线程
// 1. rs 大于 SHUTDOWN 说明线程池不再接受新任务
// 2. rs == SHUTDOWN 时线程池不再接收新任务但是会处理队列中的任务,
// 所以当 firstTask 为空, 队列不为空时, 可以新建工作线程去处理队列中的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 工作线程数
int wc = workerCountOf(c);
// 超过最大线程数, 工作线程添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加工作线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// rs 是否发生变化, 如果发生变化则返回最外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作线程是否开始运行
boolean workerStarted = false;
// 工作线程是否添加
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 判断在 RUNNING 和 SHUTDOWN && task == null
// 的情况下的线程状态是否合法
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 记录历史最大工作线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 工作线程添加成功
if (workerAdded) {
// 工作线程开始工作, 会调用 Worker 中的 run 方法
t.start();
// 工作线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3、runWorker
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环获取任务去执行
// 在 getTask() 会处理空闲超时的相关逻辑
while (task != null || (task = getTask()) != null) {
// 防止工作线程被中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池在 STOP 状态,
// 即 runStateAtLeast(ctl.get(), STOP) 为 true, 中断工作线程
// 如果线程池不是 STOP, 则要保证工作线程不在中断状态
// 即 第一次 runStateAtLeast(ctl.get(), STOP) 为 false
// Thread.interrupted() 也为 false, 第二次判断
// runStateAtLeast(ctl.get(), STOP) 是为了防止在第一次判
// 断之后其他线程调用了 shutdown 方法
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 两层 try-catch 防止 beforeExecute
// 和 afterExecute 报出异常
try {
// 执行前的前置处理方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后的后置处理方法
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任务数 + 1
w.completedTasks++;
w.unlock();
}
}
// 工作线程是否被中断
completedAbruptly = false;
} finally {
// 执行工作线程的退出
processWorkerExit(w, completedAbruptly);
}
}
4、getTask
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
// 方法返回 null 导致工作线程退出的四种情况:
// 1. 工作线程数超过了最大线程数(由于使用了 setMaximumPoolSize 方法)
// 2. 线程池为 STOP 状态
// 3. 线程池为 SHUTDOWN 状态并且任务队列为空
// 4. 该工作线程是超时可销毁的,并且超时获取任务,
// 不过超时后也可能又添加了新的任务,所以需要判断在任务队列不
// 为空时会至少保留一个线程去处理任务,具体细节看源码
private Runnable getTask() {
// 下面是个循环,这个标示用于标示
// 上一次循环调用 poll() 时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 线程池控制参数
int c = ctl.get();
// 线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1. 在 SHUTDOWN 状态下线程池不再接受任务但可以
// 处理工作队列中的任务,
// 所以当任务队列为空时工作线程可以退出,这里先减一,
// 返回 null 后该工作线程会退出。
// 2. 在 STOP 状态下工作线程不再处理任务队列的任务,
// 直接退出。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// 表示该工作线程超时后是否会退出
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 在获取 wc 后,其他线程去重新设置 maximumPoolSize,
// 导致 wc 超过 maximumPoolSize
// 1. 当工作线程数大于最大线程数 或 该工作线程
// 是超时可销毁的并上次循环时调用 poll() 已超时。
// 并且
// 2. 满足任务队列为空 或 至少有一个工作线程(这是为了保证在任
// 务队列不为空的情况下至少有一个线程可以处理任务)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 自减成功后返回 null,线程退出
if (compareAndDecrementWorkerCount(c))
return null;
// 否则继续循环(这里不用 decrementWorkerCount()
// 方法循环自减的原因是防止最后两个并发进入这个 if
// 条件,并且任务队列不为空,那么如果是调用
// 的 decrementWorkerCount() 两个线程都会被销毁,
// 就没有工作线程去完成任务队列中剩下的任务了,所以需
// 要循环重新获取参数 c 去检查)
continue;
}
try {
// 如当前工作线程是超时可销毁的则用 poll() 方法
// 否则使用 take() 方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 获取到任务,返回任务
if (r != null)
return r;
// 为获取到任务,超时
timedOut = true;
} catch (InterruptedException retry) {
// 异常重试
timedOut = false;
}
}
}
5、processWorkerExit
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers. * * @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
// 如果是非异常导致线程退出,则在 getTask() 方法就已经执行了工作线
// 程数减一的操作,如果是异常导致退出,则需要工作线程数减一(由 getTask() 方法
// 异常也需要减一,getTask() 如果异常不会在其方法内部减一)
// 异常导致的线程退出会再添加一个新的空闲工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 由于异常导致线程退出,在这里对工作线程数减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 所有对工作线程的操作都需要上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计工作量
completedTaskCount += w.completedTasks;
// 移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
// 在任务队列为空,线程池状态为 STOP 或 SHUTDOWN 状态下,
// 如果工作线程数减为 0,则进行终止线程池的操作
tryTerminate();
int c = ctl.get();
// 如果线程池状态为 RUNNING 或 SHUTDOWN 状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 获取最小线程数
// 1. allowCoreThreadTimeOut 为 true,任务队列不为空,
//最小线程数为 1
// 2. allowCoreThreadTimeOut 为 true,任务队列为空,
// 最小线程数为 0
// 3. allowCoreThreadTimeOut 为 false,最小线程数
// 为 corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 当前工作线程数大于最小线程数直接返回
// 否则添加一个空闲工作线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果是异常导致工作线程退出,则重新添加一个空闲工作线程
addWorker(null, false);
}
}
6、tryTerminate
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
// 如果线程池状态为 SHUTDOWN 并且工作线程数和任务队列为空 或
// 线程池状态为 STOP 并且工作线程数为零,则线程池状态会变为
// TERMINATED
// 必须在所有可能关闭线程池的操作后调用该方法
// 如果工作线程数不为零,则需要中断空闲的工作线程来传播关闭信息
// (工作线程退出时也会调用该方法)
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 满足其中一个条件则返回
// 1. 当前为 RUNNING 状态
// 2. 当前至少为 TIDYING 状态
// 3. 当前为 SHUTDOWN 且任务队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程数不为 0,则执行中断所有工作线程的操作并返回
// 等待下次 tryTerminate 的调用(因为每次工作线程退出都会
// 再调用一次 tryTerminate)
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 上锁修改线程池状态为 TIDYING,并执行 terminated 扩展方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 将线程池状态改为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有调用 awaitTermination 方法的线程
termination.signalAll();
}
// 线程池关闭
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
原创文章,作者:kirin,如若转载,请注明出处:https://blog.ytso.com/272126.html