Java并发编程之线程池的使用(二)


在第一篇文章中,我们发现ThreadPoolExecutor 提供四个构造器供我们创建一个线程池。关于是否使用ThreadPoolExecutor 来创建线程池,下面有两种说法:

1.虽然ThreadPoolExecutor 有四个不同的构造器,但是考虑到其复杂性,Java并发API提供了Executors类来构建执行器和相关对象。尽管我们可以直接通过其构造函数来生成ThreadPoolExecutor ,使用Executors类是更加推荐的方式

2.阿里的 Java开发手册中这样说道:【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,

这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

那么我的理解就是,如果 追求ThreadPoolExecutor 的简单实现那么就是用Executors去创建线程池,如果考虑到资源的问题,那就使用ThreadPoolExecutor的方式去创建。具体的我也只能在后面的实际工作中去琢磨了,现在多多学习吧,趁着毕业设计回校这段时间多多补充一些Java底层的知识。

下面来研究一下Executors这个执行器类:

可以发现这些方法返回实现ExecutorService接口的ThreadPoolExecutor类的对象

其创建线程池的方法:

1.newFixedThreadPool
2.newSingleThreadExecutor
3.newCachedThreadPool

第一个方法创建了一个固定大小的线程池,如果提交的任务多余空闲线程数目,那么多余的任务则放入到队列之中。

第二个方法是以单一的线程的线程池:由一个线程执行提交的任务,一个接着一个。

第三个方法构建的线程池,对于每一个任务,如果有空闲线程则直接使用,没有就创建一个新的线程。

使用第一方法来看一下效果:

public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i=0;i<12;i++){
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("开始处理线程!!!");
                        Thread.sleep(1000);
                        System.out.println("Thread:"+this.toString());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

        }
        executorService.shutdown();
    }

可见当线程满10个后,第11和地12个任务就放在队列之中,当其他任务执行完后,开始执行队列中的线程。

把newFixedThreadPool改成newCachedThreadPool后可见来多少任务就创建多少线程去执行任务

再改成newSingleThreadExecutor后可见线程使依次执行任务,执行完一个任务之后再去执行另一个任务

这里补充另一个创建线程池的理由:减少并发线程的数目。创建大量的线程会大大降低性能甚至使虚拟机崩溃,应该使用一个线程数“固定的”线程池来限制并发线程的数目。

使用Executors的方法很简单的去创建了线程池,但是确实好像简单了很多,不要去配置那么多的参数。但是也确实好像会造成资源浪费,并且该类方法返回的实现了ExecutorService接口的ThreadPoolExecutor,并没有传入RejectedExecutionHandler (拒绝策略)这个参数 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。看网上的文章觉得:这点可能会非常重要吧。。

那就研究一下ThreadPoolExecutor的拒绝策略吧:

四种拒绝策略:CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy

第一种:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
            4, 200, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(2),new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 11; i++) {
        int num = i;
        executor.execute(new Runnable() {
        @Override
        public void run() {
        System.out.println("正在執行task" + Thread.currentThread().getName());

        System.out.println("task:" +  Thread.currentThread().getName()+ "執行結束");
        }
        });
        }
        executor.shutdown();

可见:CallerRunsPolicy方法是在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。但是会阻塞主线程

第二种:AbortPolicy 源码如下,可以发现其直接抛出异常,不在继续执行后续的任务

public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

执行效果

第三种:DiscardPolicy 源码中方法体是空的,也就是不做处理,不创建新的线程,继续处理前面的任务

/**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
测试代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
            4, 200, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(2),new ThreadPoolExecutor.DiscardPolicy());
        for (int i = 0; i < 11; i++) {
            System.out.println("Thread"+i+"create");
        executor.execute(new Runnable() {
        @Override
        public void run() {
        System.out.println("正在執行task" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task:" +  Thread.currentThread().getName()+ "執行結束");
        }
        });
        }
        executor.shutdown();

测试结果:

可见循环进入后,并没有去创建线程,而是直接执行以创建的任务

第四种:DiscardOldestPolicy 从名字上看应该是放弃最早的线程去执行最新

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

测试代码:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
            4, 200, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(2),new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i = 0; i < 11; i++) {
        int index = i ;
        System.out.println("Thread"+i+"create");
        System.out.println(((ThreadPoolExecutor)executor).getQueue().size());
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("正在執行task:"+index + ":"+ Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("task:"+index + ":"+  Thread.currentThread().getName()+ "執行結束");
            }
        });
    }
        executor.shutdown();

执行效果:

可见看到在达到最大线程的时候,4和5线程放入到队列当中,但是最后执行的时候是任务9和10。也就是说discardoldestpolicy

策略会将最前面的没有执行的线程替换掉,已经执行的就让它执行完

再说一下Executors中几个方法的缺点吧

说明:Executors 各个方法的弊端: 1)newFixedThreadPool 和 newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM(java.lang.OutOfMemoryError:堆内存溢出)。 2)newCachedThreadPool

主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM

看着源码,看着书,以及网上查阅别人的博客,就自己感觉线程池创建似乎使用Executors会很方便,不需要去了解ThreadPoolExecutor太多东西,就可以创建线程池,很方便。但是对于有实际硬性要求的项目,貌似自己去实现ThreadPoolExecutor,根据不同的需求去设置不同参数更加合适一些。所以我觉得还是多多了解一下ThreadPoolExecutor这个类比较好一些,因为Executors也是返回该类的实例。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/291621.html

(0)
上一篇 2022年10月22日
下一篇 2022年10月22日

相关推荐

发表回复

登录后才能评论