线程池ThreadPoolExecutor运转机制和使用详解

线程在编程中无处不在,会用不代表你真的懂她!

线程是一个操作系统概念。操作系统负责这个线程的创建、挂起、运行、阻塞和终结操作。而操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。

因此线程池的使用对我们的程序能起到很好的优化作用。本文将重点学习 java 自带 线程池 ThreadPoolExecutor 的运行机制 和 使用方法!

ThreadPoolExecutor 概述

  1. ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务; 
  2. Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。 
  3. 先来了解其线程池管理的机制,有助于正确使用,避免错误使用导致严重故障。同时可以根据自己的需求实现自己的线程池 

ThreadPoolExecutor的构建参数

public ThreadPoolExecutor(int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler)

ThreadPoolExecutor 参数详解

  • corePoolSize
    核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。
    每个任务需要tasktime秒处理,则每个线程每钞可处理1/tasktime个任务。系统每秒有tasks个任务需要处理,则需要的线程数为:tasks/(1/tasktime),即tasks*tasktime个线程数。假设系统每秒任务数为100~1000,每个任务耗时0.1秒,则需要100*0.1至1000*0.1,即10~100个线程。那么corePoolSize应该设置为大于10,具体数字最好根据8020原则,即80%情况下系统每秒任务数,若系统80%的情况下第秒任务数小于200,最多时为1000,则corePoolSize可设置为20。
  • maxPoolSize
    当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
    当系统负载达到最大值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。每秒200个任务需要20个线程,那么当每秒达到1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60。
  • keepAliveTime
    当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
    线程数量只增加不减少也不行。当负载降低时,可减少线程数量,如果一个线程空闲时间达到keepAliveTiime,该线程就退出。默认情况下线程池最少会保持corePoolSize个线程。
  • allowCoreThreadTimeout
    是否允许核心线程空闲退出,默认值为false。
    默认情况下核心线程不会退出,可通过将该参数设置为true,让核心线程也退出。
  • queueCapacity
    任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。
    任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。队列长度可以设置为(corePoolSize/tasktime)*responsetime: (20/0.1)*2=400,即队列长度可设置为400。队列长度设置过大,会导致任务响应时间过长,切忌以下写法:LinkedBlockingQueue queue = new LinkedBlockingQueue();这实际上是将队列长度设置为Integer.MAX_VALUE,将会导致线程数量永远为corePoolSize,再也不会增加,当任务数量陡增时,任务响应时间也将随之陡增。

以上关于线程数量的计算并没有考虑CPU的情况。若结合CPU的情况,比如,当线程数量达到50时,CPU达到100%,则将maxPoolSize设置为60也不合适,此时若系统负载长时间维持在每秒1000个任务,则超出线程池处理能力,应设法降低每个任务的处理时间(tasktime)。

ThreadPoolExecutor 内部结构

ThreadPoolExecutor 内部结构图

从上图中我们可以发现 ThreadPoolExecutor 就是依靠 BlockingQueue 的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过 workQueue.take() 阻塞住。

ThreadPoolExecutor 使用例子

package demo;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorTest{
    private static int queueDeep = 4;
    public void createThreadPool(){
        /* 
         * 创建线程池,最小线程数为2,最大线程数为4,线程池维护线程的空闲时间为3秒, 
         * 使用队列深度为4的有界队列,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除, 
         * 然后重试执行程序(如果再次失败,则重复此过程),里面已经根据队列深度对任务加载进行了控制。 
         */ 
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueDeep),
                new ThreadPoolExecutor.DiscardOldestPolicy());
        // 向线程池中添加 10 个任务
        for (int i = 0; i < 10; i++){
            try{
                Thread.sleep(1);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            while (getQueueSize(tpe.getQueue()) >= queueDeep){
                System.out.println("队列已满,等3秒再添加任务");
                try{
                    Thread.sleep(3000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
            TaskThreadPool ttp = new TaskThreadPool(i);
            System.out.println("put i:" + i);
            tpe.execute(ttp);
        }
        tpe.shutdown();
    }

    private synchronized int getQueueSize(Queue queue){
        return queue.size();
    }

    public static void main(String[] args){
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.createThreadPool();
    }

    class TaskThreadPool implements Runnable{
        private int index;

        public TaskThreadPool(int index){
            this.index = index;
        }

        public void run(){
            System.out.println(Thread.currentThread() + " index:" + index);
            try{
                Thread.sleep(3000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

ThreadPoolExecutor 使用总结

  1. 用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM 
  2. 如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务 
  3. 最大线程数一般设为2N+1最好,N是CPU核数 
  4. 核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数 
  5. 如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果

线程池ThreadPoolExecutor运转机制和使用详解

: » 线程池ThreadPoolExecutor运转机制和使用详解

原创文章,作者:Carrie001128,如若转载,请注明出处:https://blog.ytso.com/tech/java/251530.html

(0)
上一篇 2022年5月3日 04:19
下一篇 2022年5月3日 04:24

相关推荐

发表回复

登录后才能评论