重新理解 java 的线程池和怎么自定义一个合适的线程池
1. 怎么自定义线程池呢?
java的线程池核心就是 ThreadPoolExecutor,后面的四种线程池也是配置不同的ThreadPoolExecutor,如:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
接下来看下,ThreadPoolExecutor 的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
主要的参数有:
(1) corePoolSize 核心线程数量
(2) maximumPoolSize 最大线程数量
(3) unit 超时时间单位 ( TimeUnit.SECONDS 等)
(4) keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了
(5) workQueue 任务阻塞队列,用来缓存待执行的任务
(6) ThreadFactory 负责创建Thread线程
(7) RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略
2. blockingQueue:
阻塞式队列,内部是一个先进先出的队列,不过是线程安全的
(1)获取元素:如果队列为空时,会阻塞等待队列中有元素再返回
(2)添加元素:如果队列已满,会阻塞等到队列有空位时再放入
种类
-
ArrayBlockingQueue 阻塞有界队列,底层采用数组实现,读写操作都要获得锁
-
LinkedBlockingQueue 阻塞无界的队列,底层使用链表,所以队列大小默认值是Integer.MAX_VALUE,也可以指定容量大小。同ArrayBlockingQueue 不一致的是LinkedBlockingQueue 对添加和移除方法使用单独的锁控制,ArrayBlockingQueue 使用同一个锁控制
参考:IT虾米网 -
DelayQueue 阻塞无界的队列,当其指定的延迟时间到了才可以队列获取到元素。往队列中插入数据的操作(生产者)不会被阻塞,而获取数据的(消费者)会被阻塞
-
PriorityBlockingQueue 基于堆的无界并发安全的优先级队列
-
SynchronousQueue 内部没有数据缓存空间,队列头元素是排队要插入数据的线程,配对的生产者和消费者线程之间直接传递数据,并不会将数据缓冲数据到队列中
3. ThreadPoolExecutor 任务执行流程
4. 拒绝策略 RejectedExecutionHandler:
当线程池线程数量和缓存队列都满了,就会执行拒绝策略来处理这种情况:
(1)AbortPolicy 直接中断抛出运行时RejectedExecutionException异常
(2)CallerRunsPolicy 在调用的线程执行运行该任务
(3)DiscardOldestPolicy 在任务队列中删除第一个元素
(4)DiscardPolicy 直接抛弃任务
5. 四种常用线程池
可见本质上都是通过ThreadPoolExecutor设置形成不同特点的线程池
(1)newCachedThreadPool
无限的线程数量 和 利用SynchronousQueue不需要缓存任务直接执行,适时回收
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(2)newSingleThreadExecutor
单一线程,无限长度的缓存队列
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
(3)newFixedThreadPool
定长的线程数,无限长度的缓存队列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
(4)ScheduledExecutorService
利用 DelayedWorkQueue 延迟执行定时任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
为什么的Executors会导致OOM呢 ?
直接看上面的 newFixedThreadPool 的构造函数,其中缓存队列的实现是 LinkedBlockingQueue,这就是导致OOM的真正原因。
BlockingQueue有两个具体实现:ArrayBlockingQueue和LinkedBlockingQueue。
LinkedBlockingQueue 可以设置最大容量,当不设置时候默认是Integer.MAX_VALUE。
在上面的newFixedThreadPool 指定的就是无限大的LinkedBlockingQueue,将会产生缓冲的任务数量过多而导致OOM。
同样,newCachedThreadPool 可能会创建无限多的线程数量,而导致OOM。
总结
最好避免OOM的方法:根据情景自定义ThreadPoolExecutor,设置合适数量的核心线程数量、合适长度的缓存队列。
AsyncTask
在这里延伸一下到Android的AsyncTask,在AysncTask有两个线程池,看下这两个线程池的特点
// 核心线程数量
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
// 最大线程数量,CPU_COUNT 是CPU 数量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
// 线程存活时间,30s
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
// 容量为128的阻塞队列
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
// 允许核心线程超时被回收
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
THREAD_POOL_EXECUTOR 是一个静态的线程池,最大线程数量为CPU数量 * 2 +1,允许核心线程超时被回收,最大能够存128个任务在队列中。
再来看下第二个线程池:
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static class SerialExecutor implements Executor {
// 存储任务队列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
// 当前执行的Runnable
Runnable mActive;
public synchronized void execute(final Runnable r) {
// 加入队列
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
// 当前任务执行完毕后,让下一个任务执行
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
// 将当前任务加入到线程池
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
SerialExecutor 并不是由ThreadPoolExecutor制造出来的,而是直接继承Executor,内部有一个队列存储任务,每次都将一个任务扔进THREAD_POOL_EXECUTOR线程池执行,执行完毕后,再从队列取出任务,串行地执行。
SERIAL_EXECUTOR 是 Asynctask 一个串行的静态线程池,是Asynctask默认使用的,而且SerialExecutor.execute是在Asynctask.execute同一个线程中。
还有一个问题,既然任务是线程池里面运行的,那么postResult、publishProgress怎么发布结果和进度到主线程呢?其实就是Handler:
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
InternalHandler 负责切换线程,发布结果和发布进度。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/19402.html