Java 线程池理解详解编程语言

重新理解 java 的线程池和怎么自定义一个合适的线程池

1. 怎么自定义线程池呢?

java的线程池核心就是 ThreadPoolExecutor,后面的四种线程池也是配置不同的ThreadPoolExecutor,如:

public static ExecutorService newFixedThreadPool(int nThreads) {
    
    return new ThreadPoolExecutor(nThreads, nThreads,  0L, TimeUnit.MILLISECONDS, 
    	   new LinkedBlockingQueue<Runnable>()); 
} 

接下来看下,ThreadPoolExecutor 的构造函数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
			  TimeUnit unit, 
			  BlockingQueue<Runnable> workQueue, 
			  ThreadFactory threadFactory,  
			  RejectedExecutionHandler handler); 

主要的参数有:
(1) corePoolSize 核心线程数量

(2) maximumPoolSize 最大线程数量

(3) unit 超时时间单位 ( TimeUnit.SECONDS 等)

(4) keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了

(5) workQueue 任务阻塞队列,用来缓存待执行的任务

(6) ThreadFactory 负责创建Thread线程

(7) RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略

2. blockingQueue:

阻塞式队列,内部是一个先进先出的队列,不过是线程安全的
(1)获取元素:如果队列为空时,会阻塞等待队列中有元素再返回
(2)添加元素:如果队列已满,会阻塞等到队列有空位时再放入

种类

  1. ArrayBlockingQueue 阻塞有界队列,底层采用数组实现,读写操作都要获得锁

  2. LinkedBlockingQueue 阻塞无界的队列,底层使用链表,所以队列大小默认值是Integer.MAX_VALUE,也可以指定容量大小。同ArrayBlockingQueue 不一致的是LinkedBlockingQueue 对添加和移除方法使用单独的锁控制,ArrayBlockingQueue 使用同一个锁控制
    参考:IT虾米网

  3. DelayQueue 阻塞无界的队列,当其指定的延迟时间到了才可以队列获取到元素。往队列中插入数据的操作(生产者)不会被阻塞,而获取数据的(消费者)会被阻塞

  4. PriorityBlockingQueue 基于堆的无界并发安全的优先级队列

  5. SynchronousQueue 内部没有数据缓存空间,队列头元素是排队要插入数据的线程,配对的生产者和消费者线程之间直接传递数据,并不会将数据缓冲数据到队列中

3. ThreadPoolExecutor 任务执行流程

在这里插入图片描述

4. 拒绝策略 RejectedExecutionHandler:

当线程池线程数量和缓存队列都满了,就会执行拒绝策略来处理这种情况:

(1)AbortPolicy 直接中断抛出运行时RejectedExecutionException异常

(2)CallerRunsPolicy 在调用的线程执行运行该任务

(3)DiscardOldestPolicy 在任务队列中删除第一个元素

(4)DiscardPolicy 直接抛弃任务

5. 四种常用线程池

可见本质上都是通过ThreadPoolExecutor设置形成不同特点的线程池

(1)newCachedThreadPool
无限的线程数量 和 利用SynchronousQueue不需要缓存任务直接执行,适时回收

public static ExecutorService newCachedThreadPool() {
    
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                  60L, TimeUnit.SECONDS, 
                                  new SynchronousQueue<Runnable>()); 
} 

(2)newSingleThreadExecutor
单一线程,无限长度的缓存队列

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 
                                0L, TimeUnit.MILLISECONDS, 
                                new LinkedBlockingQueue<Runnable>(), 
                                threadFactory)); 
} 

(3)newFixedThreadPool
定长的线程数,无限长度的缓存队列

public static ExecutorService newFixedThreadPool(int nThreads) {
    
    return new ThreadPoolExecutor(nThreads, nThreads, 
                                  0L, TimeUnit.MILLISECONDS, 
                                  new LinkedBlockingQueue<Runnable>()); 
} 

(4)ScheduledExecutorService
利用 DelayedWorkQueue 延迟执行定时任务

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    
    return new ScheduledThreadPoolExecutor(corePoolSize); 
} 
 
public ScheduledThreadPoolExecutor(int corePoolSize) {
    
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
          new DelayedWorkQueue()); 
} 

为什么的Executors会导致OOM呢 ?

直接看上面的 newFixedThreadPool 的构造函数,其中缓存队列的实现是 LinkedBlockingQueue,这就是导致OOM的真正原因。

BlockingQueue有两个具体实现:ArrayBlockingQueue和LinkedBlockingQueue。

LinkedBlockingQueue 可以设置最大容量,当不设置时候默认是Integer.MAX_VALUE。

在上面的newFixedThreadPool 指定的就是无限大的LinkedBlockingQueue,将会产生缓冲的任务数量过多而导致OOM。

同样,newCachedThreadPool 可能会创建无限多的线程数量,而导致OOM。

总结

最好避免OOM的方法:根据情景自定义ThreadPoolExecutor,设置合适数量的核心线程数量、合适长度的缓存队列。

AsyncTask

在这里延伸一下到Android的AsyncTask,在AysncTask有两个线程池,看下这两个线程池的特点

// 核心线程数量 
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4)); 
// 最大线程数量,CPU_COUNT 是CPU 数量 
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; 
// 线程存活时间,30s 
private static final int KEEP_ALIVE_SECONDS = 30; 
 
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    
    private final AtomicInteger mCount = new AtomicInteger(1); 
    public Thread newThread(Runnable r) {
    
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); 
     } 
}; 
// 容量为128的阻塞队列 
private static final BlockingQueue<Runnable> sPoolWorkQueue = 
            new LinkedBlockingQueue<Runnable>(128); 
             
public static final Executor THREAD_POOL_EXECUTOR; 
 
static {
    
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 
        CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, 
        sPoolWorkQueue, sThreadFactory); 
    // 允许核心线程超时被回收 
    threadPoolExecutor.allowCoreThreadTimeOut(true); 
    THREAD_POOL_EXECUTOR = threadPoolExecutor; 
} 

THREAD_POOL_EXECUTOR 是一个静态的线程池,最大线程数量为CPU数量 * 2 +1,允许核心线程超时被回收,最大能够存128个任务在队列中。

再来看下第二个线程池:

public static final Executor SERIAL_EXECUTOR = new SerialExecutor(); 
 
private static class SerialExecutor implements Executor {
    
    // 存储任务队列 
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); 
    // 当前执行的Runnable 
    Runnable mActive; 
 
    public synchronized void execute(final Runnable r) {
    
         // 加入队列 
         mTasks.offer(new Runnable() {
    
            public void run() {
    
                try {
    
                    r.run(); 
                } finally {
    
                    // 当前任务执行完毕后,让下一个任务执行 
                    scheduleNext(); 
                } 
            } 
        }); 
        if (mActive == null) {
    
            scheduleNext(); 
        } 
    } 
     
	// 将当前任务加入到线程池 
    protected synchronized void scheduleNext() {
    
        if ((mActive = mTasks.poll()) != null) {
    
            THREAD_POOL_EXECUTOR.execute(mActive); 
        } 
    } 
} 

SerialExecutor 并不是由ThreadPoolExecutor制造出来的,而是直接继承Executor,内部有一个队列存储任务,每次都将一个任务扔进THREAD_POOL_EXECUTOR线程池执行,执行完毕后,再从队列取出任务,串行地执行。

SERIAL_EXECUTOR 是 Asynctask 一个串行的静态线程池,是Asynctask默认使用的,而且SerialExecutor.execute是在Asynctask.execute同一个线程中。

还有一个问题,既然任务是线程池里面运行的,那么postResult、publishProgress怎么发布结果和进度到主线程呢?其实就是Handler:

private static class InternalHandler extends Handler {
    
        public InternalHandler(Looper looper) {
    
            super(looper); 
        } 
 
        public void handleMessage(Message msg) {
    
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; 
            switch (msg.what) {
    
                case MESSAGE_POST_RESULT: 
                    // There is only one result 
                    result.mTask.finish(result.mData[0]); 
                    break; 
                case MESSAGE_POST_PROGRESS: 
                    result.mTask.onProgressUpdate(result.mData); 
                    break; 
            } 
        } 
    } 

InternalHandler 负责切换线程,发布结果和发布进度。

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/19402.html

(0)
上一篇 2021年7月19日 22:09
下一篇 2021年7月19日 22:09

相关推荐

发表回复

登录后才能评论