一、简介
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
corePoolSize:核心线程池的大小,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到任务数大于等于corePoolSize。
或者是使用prestartAllCoreThreads()或者prestartCoreThread()方法初始化线程,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。
keepAliveTime:线程池维护线程所允许的空闲时间,默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue:线程池所使用的阻塞队列
ArrayBlockingQueue://是一个基于数组结构的有界阻塞队列,创建时必须指定大小,此队列按 FIFO(先进先出)原则对元素进行排序 LinkedBlockingQueue://一个基于链表结构的无界阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue,默认大小为Integer_Max_Value,静态工厂方法Executors.newFixedThreadPool()使用了这个队列 SynchronousQueue;//一个不存储元素的阻塞队列。直接新建线程来执行任务,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列 PriorityBlockingQueue://一个具有优先级的无界阻塞队列。
handler: 线程池对拒绝任务的处理策略
ThreadPoolExecutor.AbortPolicy() //丢弃任务并抛出java.util.concurrent.RejectedExecutionException异常 ThreadPoolExecutor.CallerRunsPolicy() //重试添加当前的任务,在 execute 方法的调用线程中运行被拒绝的任务 ThreadPoolExecutor.DiscardOldestPolicy() //抛弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.DiscardPolicy() //抛弃任务且不抛异常
二、线程池的工作方式
当一个任务通过execute(Runnable)方法欲添加到线程池时:
public void execute(Runnable command) { if (command ==null) thrownew NullPointerException(); intc = ctl.get(); //1 当前运行的线程数量小于核心线程数量,直接将任务加入worker启动运行。 if (workerCountOf(c) <corePoolSize) { if (addWorker(command,true)) return; c =ctl.get(); } //2 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。 运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。 但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务; if (isRunning(c) &&workQueue.offer(command)) { intrecheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) addWorker(null,false); } //3 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。 注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。 elseif (!addWorker(command,false)) reject(command); }
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
从execute源码中可以知道主要的是addWorker方法,addWorker方法主要做的工作就是新建一个Woker线程,加入到woker集合中,然后启动该线程,那么接下来的重点就是Woker类的run方法了。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //以上是线程数量的校验与更新逻辑 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
woker线程的执行流程就是首先执行初始化时分配给的任务,执行完成以后会尝试从阻塞队列中获取可执行的任务,如果指定时间内仍然没有任务可以执行,则进入销毁逻辑。(只销毁非核心线程)
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //task就是Woker构造函数入参指定的任务,即用户提交的任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
三、Executors包含的常用线程池
1.newFixedThreadPool:固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。
- 可控制线程最大并发数(同时执行的线程数)
- 超出的线程会在队列中等待
但是,在核心线程池空闲时,即核心线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
(2) newCachedThreadPool:无界线程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
- 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
- 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
(3)singleThreadPoll 单线程线程池
大小为1的固定线程池,这个其实就是newFixedThreadPool(1)
它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。
单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
(4)ScheduledThreadPoll
它的核心线程池固定,非核心线程的数量没有限制,但是闲置时会立即会被回收。
支持定时及周期性任务执行
四、线程池的关闭:
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/17290.html