声明:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González 译者:许巧辉
定制ThreadPoolExecutor类
执行者框架(Executor framework)是一种机制,允许你将线程的创建与执行分离。它是基于Executor和ExecutorService接口及其实现这两个接口的ThreadPoolExecutor类。它有一个内部的线程池和提供允许你提交两种任务给线程池执行的方法。这些任务是:
- Runnable接口,实现没有返回结果的任务
- Callable接口,实现返回结果的任务
在这两种情况下,你只有提交任务给执行者。这个执行者使用线程池中的线程或创建一个新的线程来执行这些任务。执行者同样决定任务被执行的时刻。
在这个指南中,你将学习如何覆盖ThreadPoolExecutor类的一些方法,计算你在执行者中执行的任务的执行时间,并且将关于执行者完成它的执行的统计信息写入到控制台。
准备工作
这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。
如何做…
按以下步骤来实现的这个例子:
1.创建MyExecutor类,并指定它继承ThreadPoolExecutor类。
public class MyExecutor extends ThreadPoolExecutor {
2.声明一个私有的、ConcurrentHashMap类型的属性,并参数化为String和Date类,名为startTimes。
private ConcurrentHashMap<String, Date> startTimes;
3.实现这个类的构造器,使用super关键字调用父类的构造器,并初始化startTime属性。
public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); startTimes=new ConcurrentHashMap<>(); }
4.覆盖shutdown()方法。将关于已执行的任务,正在运行的任务和待处理的任务信息写入到控制台。然后,使用super关键字调用父类的shutdown()方法。
@Override public void shutdown() { System.out.printf("MyExecutor: Going to shutdown./n"); System.out.printf("MyExecutor: Executed tasks: %d/n",getCompletedTaskCount()); System.out.printf("MyExecutor: Running tasks: %d/n",getActiveCount()); System.out.printf("MyExecutor: Pending tasks: %d/n",getQueue().size()); super.shutdown(); }
5.覆盖shutdownNow()方法。将关于已执行的任务,正在运行的任务和待处理的任务信息写入到控制台。然后,使用super关键字调用父类的shutdownNow()方法。
@Override public List<Runnable> shutdownNow() { System.out.printf("MyExecutor: Going to immediately shutdown./n"); System.out.printf("MyExecutor: Executed tasks: %d/n",getCompletedTaskCount()); System.out.printf("MyExecutor: Running tasks: %d/n",getActiveCount()); System.out.printf("MyExecutor: Pending tasks: %d/n",getQueue().size()); return super.shutdownNow(); }
6.覆盖beforeExecute()方法。写入一条信息(将要执行任务的线程名和任务的哈希编码)到控制台。在HashMap中,使用这个任务的哈希编码作为key,存储开始日期。
@Override protected void beforeExecute(Thread t, Runnable r) { System.out.printf("MyExecutor: A task is beginning: %s : %s/n",t.getName(),r.hashCode()); startTimes.put(String.valueOf(r.hashCode()), new Date()); }
7.覆盖afterExecute()方法。将任务的结果和计算任务的运行时间(将当前时间减去存储在HashMap中的任务的开始时间)的信息写入到控制台。
@Override protected void afterExecute(Runnable r, Throwable t) { Future<?> result=(Future<?>)r; try { System.out.printf("*********************************/n"); System.out.printf("MyExecutor: A task is finishing./n"); System.out.printf("MyExecutor: Result: %s/n",result.get()); Date startDate=startTimes.remove(String.valueOf(r. hashCode())); Date finishDate=new Date(); long diff=finishDate.getTime()-startDate.getTime(); System.out.printf("MyExecutor: Duration: %d/n",diff); System.out.printf("*********************************/n"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
8.创建一个SleepTwoSecondsTask类,它实现参数化为String类的Callable接口。实现call()方法。令当前线程睡眠2秒,返回转换为String类型的当前时间。
public class SleepTwoSecondsTask implements Callable<String> { public String call() throws Exception { TimeUnit.SECONDS.sleep(2); return new Date().toString(); } }
9.实现这个例子的主类,通过创建Main类,并实现main()方法。
public class Main { public static void main(String[] args) {
10.创建一个MyExecutor对象,名为myExecutor。
MyExecutor myExecutor=new MyExecutor(2, 4, 1000, TimeUnit. MILLISECONDS, new LinkedBlockingDeque<Runnable>());
11.创建一个参数化为String类的Future对象的数列,用于存储你将提交给执行者的任务的结果对象。
List<Future<String>> results=new ArrayList<>();¡;
12.提交10个Task对象。
for (int i=0; i<10; i++) { SleepTwoSecondsTask task=new SleepTwoSecondsTask(); Future<String> result=myExecutor.submit(task); results.add(result); }
13.使用get()方法,获取前5个任务的执行结果。将这些信息写入到控制台。
for (int i=0; i<5; i++){ try { String result=results.get(i).get(); System.out.printf("Main: Result for Task %d : %s/n",i,result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
14.使用shutdown()方法结束这个执行者的执行。
myExecutor.shutdown();
15.使用get()方法,获取后5个任务的执行结果。将这些信息写入到控制台。
for (int i=5; i<10; i++){ try { String result=results.get(i).get(); System.out.printf("Main: Result for Task %d : %s/n",i,result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
16.使用awaitTermination()方法等待这个执行者的完成。
try { myExecutor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
17.写入一条信息表明这个程序执行的结束。
System.out.printf("Main: End of the program./n");
它是如何工作的…
在这个指南中,我们已经通过继承ThreadPoolExecutor类和覆盖它的4个方法来实现我们自己定制的执行者。我们用beforeExecute()和afterExecute()方法来计算任务的执行时间。beforeExecute()方法是在任务执行之前被执行的。在这种情况下,我们使用HashMap来存储任务的开始(执行)时间。afterExecute()方法是在任务执行之后被执行的。你可以从HashMap中获取已完成任务的startTime(开始执行时间),然后,计算实际时间和那个时间(startTime)的差异来获取任务的执行时间。你也覆盖了shutdown()和shutdownNow()方法,将关于在执行者中已执行的任务的统计信息写入到控制台:
- 对于已执行的任务,使用getCompletedTaskCount()方法(获取)。
- 对于正在运行的任务,使用getActiveCount()方法(获取)。
对于待处理任务,使用执行者存储待处理任务的阻塞队列的size()方法(获取)。SleepTwoSecondsTask类,实现Callable接口,令它的执行线程睡眠2秒。Main类,使用它向你的执行者提交10个任务和演示其他类的特性。
执行这个程序,你将看到这个程序如何显示正在运行的每个任务的时间跨度,和根据调用shutdown()方法统计执行者。
参见
- 在第4章,线程执行者中的创建一个线程执行者指南
- 在第7章,定制并发类中的在一个Executor对象中使用我们的ThreadFactory指南
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/140863.html