感谢人民邮电大学授权并发网发布此书样章,新书购买传送门=》当当网
本章将介绍下列内容:
- 创建线程执行器
- 创建固定大小的线程执行器
- 在执行器中执行任务并返回结果
- 运行多个任务并处理第一个结果
- 运行多个任务并处理所有结果
- 在执行器中延时执行任务
- 在执行器中周期性执行任务
- 在执行器中取消任务
- 在执行器中控制任务的完成
- 在执行器中分离任务的启动与结果的处理
- 处理在执行器中被拒绝的任务
4.1 简介
通常,使用Java来开发一个简单的并发应用程序时,会创建一些 Runnable 对象,然后创建对应的 Thread 对象来执行它们。但是,如果需要开发一个程序来运行大量的并发任务,这个方法将突显以下劣势:
- 必须实现所有与 Thread 对象管理相关的代码,比如线程的创建、结束以及结果获取;
- 需要为每一个任务创建一个 Thread 对象。如果需要执行大量的任务,这将大大地影响应用程序的处理能力;
- 计算机的资源需要高效地进行控制和管理,如果创建过多的线程,将会导致系统负荷过重。
自从Java 5开始,Java并发API提供了一套意在解决这些问题的机制。这套机制称之为执行器框架(Executor Framework),围绕着 Executor 接口和它的子接口 ExecutorService,以及实现这两个接口的 ThreadPoolExecutor 类展开。
这套机制分离了任务的创建和执行。通过使用执行器,仅需要实现 Runnable 接口的对象,然后将这些对象发送给执行器即可。执行器通过创建所需的线程,来负责这些 Runnable 对象的创建、实例化以及运行。但是执行器功能不限于此,它使用了线程池来提高应用程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务,避免了不断地创建和销毁线程而导致系统性能下降。
执行器框架另一个重要的优势是 Callable 接口。它类似于 Runnable 接口,但是却提供了两方面的增强。
- 这个接口的主方法名称为 call() ,可以返回结果。
- 当发送一个 Callable 对象给执行器时,将获得一个实现了 Future 接口的对象。可以使用这个对象来控制 Callable 对象的状态和结果。
本章接下来将使用上述由Java并发API提供的类及其变体来展示如何使用执行器框架。
4.2 创建线程执行器
使用执行器框架(Executor Framework)的第一步是创建 ThreadPoolExecutor 对象。可以 ThreadPoolExecutor类提供的四个构造器或者使用Executors工厂类来创建 ThreadPoolExecutor 对象。一旦有了执行器,就可以将Runnable或Callable对象发送给它去执行了。
在本节,我们将学习如何使用两种操作来实现一个范例,这个范列将模拟一个Web服务器来应对来自不同客户端的请求。
准备工作
请先行阅读1.2节来学习用Java创建线程的基本机制。然后比较这两种机制,并根据不同的问题来选择最佳的一种。
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.实现将被Web服务器执行的任务。创建一个名为 Task 的类,并实现 Runnable 接口。
public class Task implements Runnable {
2.声明一个名为 initDate 的私有 Date 属性,用来存储任务的创建时间,然后创建一个名为 name 的私有 String 属性,用来存储任务的名称。
private Date initDate; private String name;
3.实现类的构造器,用来初始化这两个属性。
public Task(String name){ initDate=new Date(); this.name=name; }
4.实现 run() 方法。
@Override public void run() {
5.在控制台上输出 initDate 属性和实际时间,即任务的开始时间。
System.out.printf("%s: Task %s: Created on: %s/n",Thread. currentThread().getName(),name,initDate); System.out.printf("%s: Task %s: Started on: %s/n",Thread. currentThread().getName(),name,new Date());
6.将任务休眠一段随机时间。
try { Long duration=(long)(Math.random()*10); System.out.printf("%s: Task %s: Doing a task during %d seconds/n",Thread.currentThread().getName(),name,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); }
7.在控制台输入任务的完成时间。
System.out.printf("%s: Task %s: Finished on: %s/n",Thread. currentThread().getName(),name,new Date());
8.创建一个名为 Server的类,它将执行通过执行器接收到的每一个任务。
public class Server {
9.声明一个名为executor的ThreadPoolExecutor属性。
private ThreadPoolExecutor executor;
10.实现类的构造器,通过 Executors 类来初始化 ThreadPoolExecutor 对象。
public Server(){ executor=(ThreadPoolExecutor)Executors.newCachedThreadPool(); }
11.实现 executeTask() 方法。它接收一个 Task 对象作为参数,并将 Task 对象发送给执行器。在控制台输出一条信息表示新的任务已经到达。
public void executeTask(Task task){ System.out.printf("Server: A new task has arrived/n");
12.调用执行器的 execute() 方法将任务发送给Task。
executor.execute(task);
13.在控制台输出一些执行器相关的数据来观察执行器的状态。
System.out.printf("Server: Pool Size: %d/n",executor. getPoolSize()); System.out.printf("Server: Active Count: %d/n",executor. getActiveCount()); System.out.printf("Server: Completed Tasks: %d/n",executor. getCompletedTaskCount());
14.实现 endServer() 方法。在这个方法里,调用执行器的 shutdown() 方法来结束它的执行。
public void endServer() { executor.shutdown(); }
15.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) { Server server=new Server(); for (int i=0; i<100; i++){ Task task=new Task("Task "+i); server.executeTask(task); } server.endServer(); } }
工作原理
这个范例的核心在于 Server 类,这个类创建和使用 ThreadPoolExecutor 执行器来执行任务。
第一个关键点是在 Server 类的构造器中创建 ThreadPoolExecutor 对象。ThreadPoolExecutor 类有4个不同的构造器,但是,由于这些构造器在使用上的复杂性,Java并发API提供 Executors 工厂类来构造执行器和其他相关的对象。虽然可以直接通过 ThreadPoolExecutor 其中之一的构造器来创建 ThreadPoolExecutor 对象,但是推荐使用 Executors 工厂类来创建它。
在这个示例中,通过使用 Executors 工厂类的 newCachedThreadPool() 方法创建了一个缓存线程池。这个方法返回一个 ExecutorService 对象,因此它将被强制转换为 ThreadPoolExecutor 类型,并拥有所有的方法。如果需要执行新任务,缓存线程池就会创建新线程;如果线程所运行的任务执行完成后并且这个线程可用,那么缓存线程池将会重用这些线程。线程重用的优点是减少了创建新线程所花费的时间。然而,新任务固定会依赖线程来执行,因此缓存线程池也有缺点,如果发送过多的任务给执行器,系统的负荷将会过载。
备注:仅当线程的数量是合理的或者线程只会运行很短的时间时,适合采用 Executors 工厂类的 newCachedThreadPool() 方法来创建执行器。
一旦创建了执行器,就可以使用执行器的 execute() 方法来发送 Runnable 或 Callable 类型的任务。这个范例发送实现了 Runnable 接口的 Task 类型的对象给执行器。
范例中也打印了一些执行器相关的日志信息,专门使用了如下方法。
- getPoolSize() :返回执行器线程池中实际的线程数。
- getActiveCount() :返回执行器中正在执行任务的线程数。
- getCompletedTaskCount() :返回执行器已经完成的任务数。
执行器以及 ThreadPoolExecutor 类一个重要的特性是,通常需要显示地去结束它。如果不这样做,那么执行器将继续执行,程序也不会结束。如果执行器没有任务可执行了,它将继续等待新任务的到来,而不会结束执行。Java应用程序不会结束直到所有非守护线程结束它们的运行,因此,如果有终止执行器,应用程序将永远不会结束。
为了完成执行器的执行,可以使用 ThreadPoolExecutor 类的 shutdown() 方法。当执行器执行完成所有待运行的任务后,它将结束执行。调用 shutdown() 方法之后,如果尝试再发送另一个任务给执行器,任务将被拒绝,并且执行器也将抛出 RejectedExecutionException 异常。
下面的截图展示了范例执行的部分结果。
当最后一个任务到达服务器时,执行器拥有由100项任务和90个活动线程组成的池。
更多信息
ThreadPoolExecutor 类提供了许多方法来获取自身状态的信息。在范例中,已经使用了 getPoolSize() 方法来获取线程池的大小,用 getActiveCount() 方法来获取线程池中活动线程的数量,用 getCompletedTaskCount() 方法来获取执行器完成的任务数量。也可以使用 getLargestPoolSize() 方法来返回曾经同时位于线程池中的最大线程数。
ThreadPoolExecutor 类也提供了结束执行器的相关方法。
- shutdownNow() :这个方法会立即关闭执行器。执行器将不再执行那些正在等待执行的任务。这个方法将返回等待执行的任务列表。调用时,正在运行的任务将继续运行,但是这个方法并不等待这些任务完成。
- isTerminated():如果调用了shutdown()或shutdownNow()方法,并且执行器完成了关闭的过程,那么这个方法将返回 true 。
- isShutdown():如果调用了shutdown()方法,那么这个方法将返回true。
- awaitTermination(long timeout, TimeUnit unit):这个方法将阻塞所调用的线程,直到执行器完成任务或者达到所指定的 timeout值。
TimeUnit是一个枚举类,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。
备注:如果想等待任务的结束,而不管任务的持续时间,可以使用一个大的超时时间,比如DAYS。
参见
- 参见4.12节。
- 参见8.4节。
4.3 创建固定大小的线程执行器
当使用Executors类的newCachedThreadPool()方法创建基本的 ThreadPoolExecutor 时,执行器运行过程中将碰到线程数量的问题。如果线程池里没有空闲的线程可用,那么执行器将为接收到的每一个任务创建一个新线程,当发送大量的任务给执行器并且任务需要持续较长的时间时,系统将会超负荷,应用程序也将随之性能不佳。
为了避免这个问题,Executors 工厂类提供了一个方法来创建一个固定大小的线程执行器。这个执行器有一个线程数的最大值,如果发送超过这个最大值的任务给执行器,执行器将不再创建额外的线程,剩下的任务将被阻塞直到执行器有空闲的线程可用。这个特性可以保证执行器不会给应用程序带来性能不佳的问题。
在本节,我们将通过修改本章4.2节的范例来学习如何创建固定大小的线程执行器。
准备工作
请先行阅读本章的4.2节,并实现其中所阐述的范例,因为本节将对其继续修改。
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.实现本章4.2节所描述的范例。打开 Server 类并修改它的构造器,使用 newFixedThreadPool() 方法来创建执行器,并传递数字 5 作为它的参数。
public Server(){ executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5); }
2.修改 executeTask() 方法,增加一行打印日志信息。调用 getTaskCount() 方法来获取已发送到执行器上的任务数。
System.out.printf("Server: Task Count: %d/n",executor. getTaskCount());
工作原理
在这个示例中,使用 Executors 工厂类的 newFixedThreadPool() 方法来创建执行器。这个方法创建了具有线程最大数量值的执行器。如果发送超过线程数的任务给执行器,剩余的任务将被阻塞直到线程池里有空闲的线程来处理它们。newFixedThreadPool() 方法接收执行器将拥有的线程数量的最大值作为参数。这个例子创建了一个线程数量的最大值为 5 的执行器。
下面的截图展示了范例执行的部分结果。
为了在程序中输出相关信息,已经使用的 ThreadPoolExecutor 类的一些方法如下。
- getPoolSize():返回执行器中线程的实际数量。
- getActiveCount():返回执行器正在执行任务的线程数量。
将看到,控制台输出的信息是 5,表示执行器拥有 5 个线程,并且执行器不会超过这个最大的线程连接数。
当发送最后一个任务给执行器时,由于执行器只有 5 个活动的线程,所以剩余的 95 个任务只能等待空闲线程。getTaskCount() 方法可以用来显示有多少个任务已经发送给执行器。
更多信息
Executors 工厂类也提供 newSingleThreadExecutor() 方法。这是一个创建固定大小线程执行器的极端场景,它将创建一个只有单个线程的执行器。因此,这个执行器只能在同一时间执行一个任务。
参见
- 参见4.12节。
- 参见8.4节。
4.4 在执行器中执行任务并返回结果
执行器框架(Executor Framework)的优势之一是,可以运行并发任务并返回结果。Java并发API通过以下两个接口来实现这个功能。
Callable:这个接口声明了 call() 方法。可以在这个方法里实现任务的具体逻辑操作。Callable 接口是一个泛型接口,这就意味着必须声明 call() 方法返回的数据类型。
Future:这个接口声明了一些方法来获取由 Callable 对象产生的结果,并管理它们的状态。
在本节,我们将学习如何实现任务的返回结果,并在执行器中运行任务。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建名为FactorialCalculator的类,并实现Callable接口,接口的泛型参数为Integer 类型。
public class FactorialCalculator implements Callable<Integer> {
2.声明一个名为 number 的私有 Integer 属性,存储任务即将用来计算的数字。
private Integer number;
3.实现类的构造器,用来初始化类的属性。
public FactorialCalculator(Integer number){ this.number=number; }
4.实现call()方法。这个方法返回FactorialCalculator类的 number 属性的阶乘(Factorial)。
@Override public Integer call() throws Exception {
5.创建并初始化在call()方法内使用的内部变量。
int result = 1;
6.如果number值是0或1,则返回1;否则计算number的阶乘。为了演示效果,在两个乘法之间,将任务休眠20毫秒。
if ((num==0)||(num==1)) { result=1; } else { for (int i=2; i<=number; i++) { result*=i; TimeUnit.MILLISECONDS.sleep(20); } }
7.在控制台输出操作的结果。
System.out.printf("%s: %d/n",Thread.currentThread(). getName(),result);
8.返回操作的结果。
return result;
9.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
10.通过Executors工厂类的newFixedThreadPool()方法创建ThreadPoolExecutor执行器来运行任务。传递参数2给newFixedThreadPool()方法表示执行器将最多创建两个线程。
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors. newFixedThreadPool(2);
11.创建一个 Future<Integer> 类型的列表对象 resultList。
List<Future<Integer>> resultList=new ArrayList<>();
12.通过 Random 类创建一个名 random 的随机数字生成器。
Random random=new Random();
13.生成 10 个介于 0~10 之间的随机整数。
for (int i=0; i<10; i++){ Integer number= random.nextInt(10);
14.创建 FactorialCaculator 对象,并将随机数 number 传递给它作为参数。
FactorialCalculator calculator=new FactorialCalculator(number);
15.调用执行器的 submit() 方法发送 FactorialCalculator 任务给执行器。这个方法返回一个 Future<Integer> 对象来管理任务和得到的最终结果。
Future<Integer> result=executor.submit(calculator);
16.将 Future 对象添加到前面创建的 resultList 列表中。
resultList.add(result); }
17.创建一个 do 循环来监控执行器的状态。
do {
18.通过执行器的 getCompletedTaskNumber() 方法,在控制台输出信息表示任务完成的数量。
System.out.printf("Main: Number of Completed Tasks: %d/n",executor.getCompletedTaskCount());
19.遍历 resultList 列表中的 10 个 Future 对象,通过调用 isDone() 方法来输出表示任务是否完成的信息。
for (int i=0; i<resultList.size(); i++) { Future<Integer> result=resultList.get(i); System.out.printf("Main: Task %d: %s/n",i,result. isDone()); }
20.将线程休眠 50 毫秒。
try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
21.若执行器中完成的任务数量小于 10 ,则一直重复执行这个循环。
} while (executor.getCompletedTaskCount()<resultList.size());
22.在控制台上输出每一个任务得到的结果。对于每一个 Future 对象来讲,通过调用 get() 方法将得到由任务返回的 Integer 对象。
System.out.printf("Main: Results/n"); for (int i=0; i<resultList.size(); i++) { Future<Integer> result=resultList.get(i); Integer number=null; try { number=result.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
23.在控制台上打印出数字number。
System.out.printf("Main: Task %d: %d/n",i,number); }
24.调用执行器的 shutdown() 方法结束执行。
executor.shutdown();
工作原理
在本节,我们学习了如何使用 Callable 接口来启动并发任务并返回结果。我们编写了 FactorialCalculator 类,它实现了带有泛型参数 Integer 类型的 Callable 接口。因此,这个 Integer 类型将作为在调用 call() 方法时返回的类型。
范例的另一个关键点在 Main 主类中。我们通过 submit() 方法发送一个 Callable 对象给执行器去执行,这个 submit() 方法接收 Callable 对象作为参数,并返回 Future 对象。Future 对象可以用于以下两个主要目的。
- 控制任务的状态:可以取消任务和检查任务是否已经完成。为了达到这个目的,可使用 isDone() 方法来检查任务是否已经完成。
- 通过 call() 方法获取返回的结果。为了达到这个目的,可使用 get() 方法。这个方法一直等待直到 Callable 对象的 call() 方法执行完成并返回结果。如果 get() 方法在等待结果时线程中断了,则将抛出一个 InterruptedException异常。如果 call() 方法抛出异常那么 get() 方法将随之抛出 ExecutionException 异常。
更多信息
在调用Future对象的get()方法时,如果Future对象所控制的任务并未完成,那么这个方法将一直阻塞到任务完成。Future 接口也提供了get()方法的其他调用方式。
- get(long timeout,TimeUnit unit):如果调用这个方法时,任务的结果并未准备好,则方法等待所指定的timeout时间。如果等待超过了指定的时间而任务的结果还没有准备好,那么这个方法将返回null。
TimeUnit是一个枚举类,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。
参见
- 参见4.2节。
- 参见4.5节。
- 参见4.6节。
4.5 运行多个任务并处理第一个结果
并发编程比较常见的一个问题是,当采用多个并发任务来解决一个问题时,往往只关心这些任务中的第一个结果。比如,对一个数组进行排序有很多种算法,可以并发启动所有算法,但是对于一个给定的数组,第一个得到排序结果的算法就是最快的排序算法。
在本节,我们将学习如何使用 ThreadPoolExecutor 类来实现这个场景。范例允许用户可以通过两种验证机制进行验证,但是,只要有一种机制验证成功,那么这个用户就被验证通过了。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为 UserValidator 的类,它将实现用户验证的过程。
public class UserValidator {
2.声明一个名为 name 的私有 String 属性,用来存储用户验证系统的名称。
private String name;
3.实现类的构造器,用来初始化类的属性。
public UserValidator(String name) { this.name=name; }
4.实现 validate() 方法。它接收两个 String 参数,分别取名为用户名name 和密码 password,这两个参数也将被用来进行用户验证。
public boolean validate(String name, String password) {
5.创建一个名为 random 的 Random 类型的随机对象。
Random random=new Random();
6.等待一段随机时间来模拟用户验证的过程。
try { long duration=(long)(Math.random()*10); System.out.printf("Validator %s: Validating a user during %d seconds/n",this.name,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { return false; }
7.返回随机的 boolean 值。当用户通过验证时,这个方法返回 true 值,如果用户没有通过验证则返回 false 值。
return random.nextBoolean(); }
8.实现 getName() 方法。这个方法返回 name 属性值。
public String getName(){ return name; }
9.创建一个名为 TaskValidator 的类,它将通过 UserValidation 对象作为并发任务来执行用户验证的过程。这个类实现了带有 String 泛型参数的 Callable 接口。
public class TaskValidator implements Callable<String> {
10.声明一个名为 validator 的私有 UserValidator 属性。
private UserValidator validator;
11.声明两个私有的 String 属性,分别为用户名 user 和密码 password 。
private String user; private String password;
12.实现类的构造器,用来初始化类的属性。
public TaskValidator(UserValidator validator, String user, String password){ this.validator=validator; this.user=user; this.password=password; }
13.实现call()方法,并返回String对象。
@Override public String call() throws Exception {
14.如果用户没有通过 UserValidator 对象的验证,就在控制台输出没有找到这个用户,表明该用户未通过验证,并抛出 Exception 类型的异常。
if (!validator.validate(user, password)) { System.out.printf("%s: The user has not been found/ n",validator.getName()); throw new Exception("Error validating user"); }
15.否则,就在控制台输出用户已经找到,表明该用户已经通过验证,然后返回 UserValidator 对象的名称。
System.out.printf("%s: The user has been found/n",validator. getName()); return validator.getName();
16.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
17.创建两个 String 对象,分别取名为 username 和 password,并初始化这两个属性值为test。
String username="test"; String password="test";
18.创建两个 UserValidator 对象,分别取名为 ldapValidator 和 dbValidator。
UserValidator ldapValidator=new UserValidator("LDAP"); UserValidator dbValidator=new UserValidator("DataBase");
19.创建两个TaskValidator对象,分别取名为ldapTask和dbTask,并分别用ldapValidator 和dbValidator来初始化他们。
TaskValidator ldapTask=new TaskValidator(ldapValidator, username, password); TaskValidator dbTask=new TaskValidator(dbValidator, username,password);
20.创建一个名为 taksList 的 TaskValidator 类型列表,并将 ldapTask 和 dbTask 添加到列表中。
List<TaskValidator> taskList=new ArrayList<>(); taskList.add(ldapTask); taskList.add(dbTask);
21.通过Executors工厂类的newCachedThreadPool()方法创建一个新的 ThreadPoolExecutor 执行器对象,并创建一个名为 result 的 String 对象。
ExecutorService executor=(ExecutorService)Executors. newCachedThreadPool(); String result;
22.调用执行器的 invokeAny() 方法。这个方法接收 taskList 作为参数,并返回String 对象。然后,在控制台上输出这个方法返回的 String 对象。
try { result = executor.invokeAny(taskList); System.out.printf("Main: Result: %s/n",result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
23.通过shutdown()方法来终止执行器,并在控制台输出信息表示程序已经执行结束。
executor.shutdown(); System.out.printf("Main: End of the Execution/n");
工作原理
这个范例的关键点在 Main 主类中。ThreadPoolExecutor 类的 invokeAny() 方法接收到一个任务列表,然后运行任务,并返回第一个完成任务并且没有抛出异常的任务的执行结果。这个方法返回的类型与任务里的 call() 方法返回的类型相同,在这个范例中,它将返回 String 类型值。
下面的截图展示了当范例运行后,有一个任务成功地验证了用户后的运行结果。
范例中有两个UserValidator对象,它们返回随机的boolean值。每一个UserValidator对象被TaskValidator对象使用,TaskValidator对象实现了Callable接口。如果 UserValidator类的validate()方法返回false值,那么TaskValidator类将抛出Exception异常。否则,返回true值。
因此,我们有两个任务可以返回true值或抛出Exception异常。从而,可以有如下4种可能性。
- 如果两个任务都返回true值,那么invokeAny()方法的结果就是首先完成任务的名称。
- 如果第一个任务返回true值,第二个任务抛出Exception异常,那么invokeAny() 方法的结果就是第一个任务的名称。
- 如果第一个任务抛出Exception异常,第二个任务返回true值,那么invokeAny() 方法的结果就是第二个任务的名称。
- 如果两个任务都抛出Exception异常,那么invokeAny()方法将抛出 ExecutionException异常。
将这个范例多运行几次,那么将得到如上所述的四种可能的结果。以下截图则显示当两个任务同时抛出异常时,应用程序得到的结果。
更多信息
ThreadPoolExecutor 类还提供了 invokeAny() 方法的其他版本:
invokeAny(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):这个方法执行所有的任务,如果在给定的超时期满之前某个任务已经成功完成(也就是未抛出异常),则返回其结果。
TimeUnit是一个枚举类,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。
参见
- 参见4.6节。
4.6 运行多个任务并处理所有结果
执行器框架(Executor Framework)允许执行并发任务而不需要去考虑线程创建和执行。它还提供了可以用来控制在执行器中执行任务的状态和获取任务运行结果的 Future 类。
如果想要等待任务结束,可以使用如下两种方法。
- 如果任务执行结束,那么Future接口的isDone()方法将返回true。
- 在调用shutdown()方法后,ThreadPoolExecutor类的awaitTermination()方法会将线程休眠,直到所有的任务执行结束。
这两个方法有一些缺点:第一个方法,仅可以控制任务的完成与否;第二个方法,必须关闭执行器来等待一个线程,否则调用这个方法线程将立即返回。
ThreadPoolExecutor 类还提供一个方法,它允许发送一个任务列表给执行器,并等待列表中所有任务执行完成。在本节,我们将编写范例,执行三个任务,当它们全部执行结束后打印出结果信息,用来学习如何使用这个特性。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为 Result 的类,用来存储范例中并发任务产生的结果。
public class Result {
2.声明两个私有属性。一个名为 name 的 String 属性,一个名为 value 的 int 属性。
private String name; private int value;
3.实现对应的 get() 和 set() 方法来设置和返回 name 和 value 属性。
public String getName() { return name; } public void setName(String name) { this.name = name; } public int getValue() { return value; } public void setValue(int value) { this.value = value; }
4.创建一个名为Task的类,并实现Callable接口,接口的泛型参数为Result类型。
public class Task implements Callable<Result> {
5.声明一个名为 name 的私有 String 属性。
private String name;
6.实现类的构造器,用来初始化类的属性。
public Task(String name) { this.name=name; }
7.实现call()方法。在这个范例中,这个方法将返回一个Result类型的对象。
@Override public Result call() throws Exception {
8.在控制台输出表示任务开始的信息。
System.out.printf("%s: Staring/n",this.name);
9.等待一段随机时间。
try { long duration=(long)(Math.random()*10); System.out.printf("%s: Waiting %d seconds for results./ n",this.name,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); }
10.生成一个int值,准备作为返回Result对象中的int属性,这个int值为5个随机数的总和。
int value=0; for (int i=0; i<5; i++){ value+=(int)(Math.random()*100); }
11.创建一个Result对象,并用任务的名称和上一步计算的int值来对其进行初始化。
Result result=new Result(); result.setName(this.name); result.setValue(value);
12.在控制台输出信息表示任务执行结束。
System.out.println(this.name+": Ends");
13.返回Result对象。
return result; }
14.实现范例的主类,创建Main主类,并实现main()方法。
public class Main { public static void main(String[] args) {
15.通过Executors工厂类的newCachedThreadPool()方法创建一个ThreadPoolExecutor 执行器对象。
ExecutorService executor=(ExecutorService)Executors. newCachedThreadPool();
16.创建一个Task类型的任务列表taskList。创建3个Task任务并将它们添加到任务列表taskList中。
List<Task> taskList=new ArrayList<>(); for (int i=0; i<3; i++){ Task task=new Task(i); taskList.add(task); }
17.创建一个 Future 类型的结果列表 resultList。这些对象泛型参数为 Result 类型。
List<Future<Result>>resultList=null;
18.调用 ThreadPoolExecutor 类的 invokeAll() 方法。这个方法将返回上一步所创建的 Future 类型的列表。
try { resultList=executor.invokeAll(taskList); } catch (InterruptedException e) { e.printStackTrace(); }
19.调用shutdown()方法结束执行器。
executor.shutdown();
20.在控制台输出任务处理的结果,即Future类型列表中的Result结果。
System.out.println("Main: Printing the results"); for (int i=0; i<resultList.size(); i++){ Future<Result> future=resultList.get(i); try { Result result=future.get(); System.out.println(result.getName()+": "+result. getValue()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
工作原理
在本节,我们学习了如何发送任务列表给执行器,并且通过invokeAll()方法等待所有任务的完成。这个方法接收一个Callable对象列表,并返回一个Future对象列表。在这个列表中,每一个任务对应一个Future对象。Future对象列表中的第一个对象控制Callable列表中第一个任务,以此类推。
需要注意的一点是,在存储结果的列表声明中,用在Future接口中的泛型参数的数据类型必须与Callable接口的泛型数据类型相兼容。在这个例子中,我们使用的是相同的数据类型:Result类。
另一个关于invokeAll()方法重要的地方是,使用Future对象仅用来获取任务的结果。当所有的任务执行结束时这个方法也执行结束了,如果在返回的Future对象上调用isDone()方法,那么所有的调用将返回true值。
更多信息
ExecutorService 接口还提供了 invokeAll() 方法的另一个版本:
- invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):当所有任务执行完成,或者超时的时候(无论哪个首先发生),这个方法将返回保持任务状态和结果的Future列表。
TimeUnit是一个枚举类,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。
参见
- 参见4.4节。
- 参见4.5节。
4.7 在执行器中延时执行任务
执行器框架(Executor Framework)提供了 ThreadPoolExecutor 类并采用线程池来执行 Callable 和 Runnable 类型的任务,采用线程池可以避免所有线程的创建操作而提高应用程序的性能。当发送一个任务给执行器时,根据执行器的相应配置,任务将尽可能快地被执行。但是,如果并不想让任务马上被执行,而是想让任务在过一段时间后才被执行,或者任务能够被周期性地执行。为了达到这个目的,执行器框架提供了 ScheduledThreadPoolExecutor 类。
在本节,我们将学习如何创建 ScheduledThreadPoolExecutor 执行器,以及如何使用它在经过一个给定的时间后开始执行任务。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为Task的类,并实现Callable接口,接口的泛型参数为String类型。
public class Task implements Callable<String> {
2.声明一个名为name的私有String属性,用来存储任务的名称。
private String name;
3.实现类的构造器,并初始化 name 属性。
public Task(String name) { this.name=name; }
4.实现call()方法。在控制台输出实际的时间,并返回一个文本信息,比如“Hello,world”。
public String call() throws Exception { System.out.printf("%s: Starting at : %s/n",name,new Date()); return "Hello, world"; }
5.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
6.通过Executors工厂类的newScheduledThreadPool()方法创建一个 ScheduledThreadPoolExecutor 执行器,并传递 1 作为参数。
ScheduledThreadPoolExecutor executor=(ScheduledThreadPoolExecu tor)Executors.newScheduledThreadPool(1);
7.初始化一些任务(在我们的示例中是 5 个),然后通过ScheduledThreadPoolExecutor 实例的 schedule() 方法来启动这些任务。
System.out.printf("Main: Starting at: %s/n",new Date()); for (int i=0; i<5; i++) { Task task=new Task("Task "+i); executor.schedule(task,i+1 , TimeUnit.SECONDS); }
8.调用执行器的 shutdown() 方法来结束执行器。
executor.shutdown();
9.调用执行器的 awaitTermination() 方法等待所有任务结束。
try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
10.在控制台输出信息表示程序执行结束的时间。
System.out.printf("Main: Ends at: %s/n",new Date());
工作原理
这个范例的关键点在于 Main 主类和 ScheduledThreadPoolExecutor 执行器的管理。虽然可以通过 ThreadPoolExecutor 类来创建定时执行器,但是在Java并发API中则推荐利用 Executors 工厂类来创建。在这个范例中,必须使用 newScheduledThreadPool() 方法,并且传递数字 1 作为方法的参数,这个参数就是线程池里拥有的线程数。
为了在定时执行器中等待一段给定的时间后执行一个任务,需要使用 schedule() 方法。这个方法接收如下的参数:
- 即将执行的任务;
- 任务执行前所要等待的时间;
- 等待时间的单位,由 TimeUnit 类的一个常量来指定。
在这个示例中,每个任务将等待 N 秒(TimeUnit.SECONDS),这个 N 值则等于任务在数组中的位置加 1。
备注:如果想在一个给定的时间点来定时执行任务,那就需要计算这个给定时间点和当前时间的差异值,然后用这个差异值作为任务的延迟值。
通过下面的截图,可以看到范例运行的部分结果。
从结果可知,每隔 1 秒钟就有一个任务开始执行;这是因为所有的任务被同时发送到执行器,但每个任务都比前一个任务延迟了 1 秒钟。
更多信息
也可以使用Runnable接口来实现任务,因为ScheduledThreadPoolExecutor类的 schedule()方法可以同时接受这两种类型的任务。
虽然ScheduledThreadPoolExecutor 类是 ThreadPoolExecutor 类的子类,因而继承了 ThreadPoolExecutor 类所有的特性。但是,Java推荐仅在开发定时任务程序时采用 ScheduledThreadPoolExecutor 类。
最后,在调用shutdown()方法而仍有待处理的任务需要执行时,可以配置 ScheduledThreadPoolExecutor的行为。默认的行为是不论执行器是否结束,待处理的任务仍将被执行。但是,通过调用ScheduledThreadPoolExecutor类的 setExecuteExisting
DelayedTasksAfterShutdownPolicy()方法则可以改变这个行为。传递false参数给这个方法,执行shutdown()方法后,待处理的任务将不会被执行。
参见
- 参见4.4节。
4.8 在执行器中周期性执行任务
执行器框架(Executor Framework)提供了 ThreadPoolExecutor 类,通过线程池来执行并发任务从而避免了所有线程的创建操作。当发送一个任务给执行器后,根据执行器的配置,它将尽快地执行这个任务。当任务执行结束后,这个任务就会从执行器中删除;如果想再次执行这个任务,则需要再次发送这个任务到执行器。
但是,执行器框架提供了 ScheduledThreadPoolExecutor 类来执行周期性的任务。在本节,我们将学习如何使用这个类的功能来计划执行周期性的任务。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为 Task 的类,并实现 Runnable 接口。
public class Task implements Runnable {
2.声明一个名为 name 的私有 String 属性,用来存储任务的名称。
private String name;
3.实现类的构造器,用来初始化类的属性。
public Task(String name) { this.name=name; }
4.实现 run() 方法。在控制台输出实际的时间,用来检验任务将在指定的一段时间内执行。
@Override public String call() throws Exception { System.out.printf("%s: Starting at : %s/n",name,new Date()); return "Hello, world"; }
5.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
6.通过调用Executors工厂类的newScheduledThreadPool()方法创建ScheduledThreadPoolExecutor 执行器对象,传递 1 作为这个方法的参数。
ScheduledExecutorService executor=Executors. newScheduledThreadPool(1);
7.在控制台输出实际时间。
System.out.printf("Main: Starting at: %s/n",new Date());
8.创建一个新的Task对象。
Task task=new Task("Task");
9.调用scheduledAtFixRate()方法将这个任务发送给执行器。传递给这个方法的参数分别为上一步创建的task对象、数字1、数字2,以及TimeUnit.SECONDS常量。这个方法返回一个用来控制任务状态的ScheduledFuture对象。
ScheduledFuture<?> result=executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
10.创建一个10步的循环,在控制台输出任务下一次将要执行的剩余时间。在循环体内,用ScheduledFuture类的getDelay()方法来获取任务下一次将要执行的毫秒数,然后将线程休眠500毫秒。
for (int i=0; i<10; i++){ System.out.printf("Main: Delay: %d/n",result. getDelay(TimeUnit.MILLISECONDS)); Sleep the thread during 500 milliseconds. try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }
11.调用 shutdown() 方法结束执行器。
executor.shutdown();
12.将线程休眠 5 秒,等待周期性的任务全部执行完成。
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
13.在控制台输出信息表示程序结束。
System.out.printf("Main: Finished at: %s/n",new Date());
工作原理
想要通过执行器框架来执行一个周期性任务时,需要一个ScheduledExecutorService 对象。同创建执行器一样,在Java中推荐使用Executors工厂类来创建 ScheduledExecutorService对象。Executors类就是执行器对象的工厂。在这个例子中,可以使用newScheduledThreadPool()方法来创建一个ScheduledExecutorService对象。这个方法接收一个表示线程池中的线程数来作为参数。在这个范例中,因为仅有一个任务,所以只需要传递数字 1 作为参数即可。
一旦有了可以执行周期性任务的执行器,就可以发送任务给这个执行器。在范例中,我们使用scheduledAtFixedRate()方法发送任务。这个方法接收4个参数,分别为将被周期性执行的任务,任务第一次执行后的延时时间,两次执行的时间周期,以及第2个和第3个参数的时间单位。这个单位是TimeUnit枚举的常量。TimeUnit是一个枚举类,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。
另一个需要注意的是,两次执行之间的周期是指任务在两次执行开始时的时间间隔。如果有一个周期性的任务需要执行 5 秒钟,但是却让它每 3 秒钟执行一次,那么,在任务执行的过程中将会有两个任务实例同时存在。
scheduleAtFixedRate()方法返回一个ScheduledFuture对象,ScheduledFuture接口则扩展了Future接口,于是它带有了定时任务的相关操作方法。ScheduledFuture是一个泛型参数化的接口。在这个示例中,任务是Runnable对象,并没有泛型参数化,必须通过 ? 符号作为参数来泛型化它们。
我们已经使用过 ScheduledFuture 接口中的一个方法。getDelay()方法返回任务到下一次执行时所要等待的剩余时间。这个方法接收一个 TimeUnit 常量作为时间单位。
下面的截图显示了范例的部分运行结果。
通过控制上面的信息,可以看到任务是每 2 秒执行一次;剩余的延迟时间会每隔 500 毫秒在控制台上输出,这个 500 毫秒则是主线程将被休眠的时间。当关闭执行器时,定时任务将结束执行,然后在控制台上也看不到更多的信息了。
更多信息
ScheduledThreadPoolExecutor 类还提供了其他方法来安排周期性任务的运行,比如,scheduleWithFixedRate()方法。这个方法与 scheduledAtFixedRate() 方法具有相同的参数,但是略有一些不同需要引起注意。在 scheduledAtFixedRate() 方法中,第 3 个参数表示任务两次执行开始时间的间隔,而在 schedulledWithFixedDelay () 方法中,第 3 个参数则是表示任务上一次执行结束的时间与任务下一次开始执行的时间的间隔。
也可以配置ScheduledThreadPoolExecutor实现shutdown()方法的行为,默认行为是当调用shutdown()方法后,定时任务就结束了。可以通过 ScheduledThreadPoolExecutor类的setContinueExistingPeriodicTasksAfterShutdownPolicy() 方法来改变这个行为,传递参数true给这个方法,这样调用shutdown()方法后,周期性任务仍将继续执行。
参见
- 参见4.2节。
- 参见4.7节。
4.9 在执行器中取消任务
使用执行器时,不需要管理线程,只需要实现 Runnable 或 Callable 任务并发送任务给执行器即可。执行器负责创建线程,管理线程池中的线程,当线程不再需要时就销毁它们。有时候,我们可能需要取消已经发送给执行器的任务。在这种情况下,可以使用 Future 接口的 cancel() 方法来执行取消操作。在本节,我们将学习如何使用这个方法来取消已经发送给执行器的任务。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为Task的类,并实现Callable接口,接口的泛型参数为String类型。接着实现call()方法,构造一个无限循环,先在控制台输出信息,然后休眠100毫秒。
public class Task implements Callable<String> { @Override public String call() throws Exception { while (true){ System.out.printf("Task: Test/n"); Thread.sleep(100); } }
2.实现范例主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
3.通过Executors工厂类的newCachedThreadPool()方法创建一个ThreadPoolExecutor执行器对象。
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors. newCachedThreadPool();
4.创建一个新的Task对象。
Task task=new Task();
5.调用submit()方法将任务发送给执行器。
System.out.printf("Main: Executing the Task/n"); Future<String> result=executor.submit(task);
6.让主线程休眠2秒。
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
7.执行器的submit()方法返回名为result的Future对象,调用Future对象的cancel() 方法来取消任务的执行。传递参数true给这个cancel()方法。
System.out.printf("Main: Canceling the Task/n"); result.cancel(true);
8.在控制台输出调用isCancelled() 方法和isDone()方法的结果,来验证任务已经被取消和已完成。
System.out.printf("Main: Cancelled: %s/n",result.isCancelled()); System.out.printf("Main: Done: %s/n",result.isDone());
9.调用shutdown()方法结束执行器,然后在控制台输出信息表示程序执行结束。
executor.shutdown(); System.out.printf("Main: The executor has finished/n");
工作原理
如果想取消一个已经发送给执行器的任务,可以使用 Future 接口的 cancel() 方法。根据调用 cancel() 方法时所传递的参数以及任务的状态,这个方法的行为有些不同。
- 如果任务已经完成,或者之前已经被取消,或者由于某种原因而不能被取消,那么方法将返回 false 并且任务也不能取消。
- 如果任务在执行器中等待分配 Thread 对象来执行它,那么任务被取消,并且不会开始执行。如果任务已经在运行,那么它依赖于调用 cancel() 方法时所传递的参数。如果传递的参数为 true 并且任务正在运行,那么任务将被取消。如果传递的参数为 false 并且任务正在运行,那么任务不会被取消。
下面的截图展示了范例执行的结果。
更多信息
如果Future对象所控制任务已经被取消,那么使用Future对象的get()方法时将抛出 CancellationException异常。
参见
- 参见4.4节。
4.10 在执行器中控制任务的完成
FutureTask 类提供了一个名为 done() 的方法,允许在执行器中的任务执行结束之后,还可以执行一些代码。这个方法可以被用来执行一些后期处理操作,比如:产生报表,通过邮件发送结果或释放一些系统资源。当任务执行完成是受 FutureTask 类控制时,这个方法在内部被 FutureTask 类调用。在任务结果设置后以及任务的状态已改变为 isDone之后,无论任务是否被取消或者正常结束,done()方法才被调用。
默认情况下,done()方法的实现为空,即没有任何具体的代码实现。我们可以覆盖 FutureTask 类并实现done()方法来改变这种行为。在本节,我们将学习如何覆盖这个方法,并在任务结束后执行这些代码。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建名为ExecutableTask的类,并实现Callable接口,接口的泛型参数为String类型。
public class ExecutableTask implements Callable<String> {
2.声明一个名为name的私有String属性,用来存储任务的名称,用 getName() 方法来返回这个属性值。
private String name; public String getName(){ return name; }
3.实现类的构造器,并初始化任务的名称。
public ExecutableTask(String name){ this.name=name; }
4.实现 call() 方法。将任务休眠一段随机时间,并返回带有任务名称的消息。
@Override public String call() throws Exception { try { long duration=(long)(Math.random()*10); System.out.printf("%s: Waiting %d seconds for results./ n",this.name,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { } return "Hello, world. I'm "+name; }
5.实现一个名为ResultTask的类,并继承FutureTask类。FutureTask类的泛型参数为String类型。
public class ResultTask extends FutureTask<String> {
6.声明一个名为name的私有String属性,用来存储任务的名称。
private String name;
7.实现类构造器。它接收一个Callable对象作为参数,调用父类构造器,并用接收到的任务属性来初始化name属性。
public ResultTask(Callable<String> callable) { super(callable); this.name=((ExecutableTask)callable).getName(); }
8.覆盖done()方法。检查isCancelled()方法的返回值,然后根据这个返回值在控制台输出不同的信息。
Override protected void done() { if (isCancelled()) { System.out.printf("%s: Has been canceled/n",name); } else { System.out.printf("%s: Has finished/n",name); } }
9.实现范例的主类,创建Main主类,然后实现main()方法。
public class Main { public static void main(String[] args) {
10.调用Executors工厂类的newCachedThreadPool()方法创建一个 ExecutorService 执行器对象。
ExecutorService executor=(ExecutorService)Executors. newCachedThreadPool();
11.创建一个数组用来存储5个ResultTask对象。
ResultTask resultTasks[]=new ResultTask[5];
12.初始化ResultTask对象。在数组的每一个位置上,必须创建 ExecutorTask 对象,然后创建ResultTask对象来使用ExecutorTask对象,最后调用submit()方法将 ResultTask任务发送给执行器。
for (int i=0; i<5; i++) { ExecutableTask executableTask=new ExecutableTask("Task "+i); resultTasks[i]=new ResultTask(executableTask); executor.submit(resultTasks[i]); }
13.将主线程休眠5秒钟。
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e1) { e1.printStackTrace(); }
14.取消已经发送给执行器的所有任务。
for (int i=0; i<resultTasks.length; i++) { resultTasks[i].cancel(true); }
15.通过调用ResultTask对象的get()方法,在控制台上输出还没有被取消的任务结果。
for (int i=0; i<resultTasks.length; i++) { try { if (!resultTasks[i].isCancelled()){ System.out.printf("%s/n",resultTasks[i].get()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
16.调用shutdown()方法结束执行器。
executor.shutdown(); } }
工作原理
当任务执行结束时, FutureTask类就会调用done()方法。在这个范例中,我们实现了一个Callable类、一个ExecutableTask类以及一个FutureTask类的子类ResultTask,这个子类用来控制ExecutableTask对象的执行。
在创建好返回值以及改变任务状态为isDone之后,FutureTask类就会在内部调用 done()方法。虽然我们无法改变任务的结果值,也无法改变任务的状态,但是可以通过任务来关闭系统资源、输出日志信息、发送通知等。
参见
- 参见4.4节。
4.11 在执行器中分离任务的启动与结果的处理
通常情况下,使用执行器来执行并发任务时,将Runnable或Callable任务发送给执行器,并获得Future对象来控制任务。此外,还会碰到如下情形,需要在一个对象里发送任务给执行器,然后在另一个对象里处理结果。对于这种情况,Java并发API提供了 CompletionService 类。
CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。从内部实现机制来看,CompletionService类使用 Executor对象来执行任务。这个行为的优势是可以共享CompletionService对象,并发送任务到执行器,然后其他的对象可以处理任务的结果。第二个方法有一个不足之处,它只能为已经执行结束的任务获取Future对象,因此,这些Future对象只能被用来获取任务的结果。
在本节,我们将学习如何使用CompletionService类,在执行器中来分离任务的启动与结果的处理。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建名为 ReportGenerator 的类,并实现 Callable 接口,接口的泛型参数为 String 类型。
public class ReportGenerator implements Callable<String> {
2.声明两个私有的String属性,分别命名为sender和title,将用来表示报告的数据。
private String sender; private String title;
3.实现类的构造器,用来初始化这两个属性。
public ReportGenerator(String sender, String title){ this.sender=sender; this.title=title; }
4.实现call()方法。让线程休眠一段随机时间。
@Override public String call() throws Exception { try { long duration=(long)(Math.random()*10); System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds/n",this.sender,this.title,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); }
5.生成包含了sender和title属性的字符串并返回该字符串。
String ret=sender+": "+title; return ret; }
6.创建一个名为ReportRequest的类,实现Runnable接口。这个类将模拟请求获取报告。
public class ReportRequest implements Runnable {
7.声明一个名为name的私有String属性,用来存储ReportRequest的名称。
private String name;
8.声明一个名为 service 的私有 CompletionService 属性,这个 CompletionService 接口是泛型接口。在这个示例中,我们采用 String 类作为泛型参数。
private CompletionService<String> service;
9.实现类的构造器,并初始化这两个属性。
public ReportRequest(String name, CompletionService<String> service){ this.name=name; this.service=service; }
10.实现run()方法。创建ReportGenerator对象,然后调用CompletionService对象的submit()方法将ReportGenerator对象发送给CompletionService对象。
@Override public void run() { ReportGenerator reportGenerator=new ReportGenerator(name, "Report"); service.submit(reportGenerator); }
11.创建名为ReportProcessor的类,并实现Runnable接口。这个类将获取到 ReportGenerator任务的结果。
public class ReportProcessor implements Runnable {
12.声明一个名为 service 的私有CompletionService属性。由于CompletionService接口是一个泛型接口,在这个示例中,我们采用String类作为泛型参数。
private CompletionService<String> service;
13.声明一个名为end的私有boolean属性。
private boolean end;
14.实现类的构造器,并初始化这两个属性。
public ReportProcessor (CompletionService<String> service){ this.service=service; end=false; }
15.实现run()方法。如果end属性值为false,则调用CompletionService接口的poll() 方法,来获取下一个已经完成任务的Future对象;当然,这个任务是采用 CompletionService来完成的。
@Override public void run() { while (!end){ try { Future<String> result=service.poll(20, TimeUnit.SECONDS);
16.通过调用Future对象的get()方法来获取任务的结果,并在控制台输出这些结果。
if (result!=null) { String report=result.get(); System.out.printf("ReportReceiver: Report Received: %s/n",report); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } System.out.printf("ReportSender: End/n"); }
17.实现setEnd()设置方法,修改end的属性值。
public void setEnd(boolean end) { this.end = end; }
18.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
19.调用Executors工厂类的newCachedThreadPool()方法创建ThreadPoolExecutor执行器对象。
ExecutorService executor=(ExecutorService)Executors. newCachedThreadPool();
20.创建CompletionService对象,并将上一步创建的executor对象作为构造器的参数。
CompletionService<String> service=new ExecutorCompletionServic e<>(executor);
21.创建两个ReportRequest对象,然后创建两个线程Thread对象分别来执行它们。
ReportRequest faceRequest=new ReportRequest("Face", service); ReportRequest onlineRequest=new ReportRequest("Online", service); Thread faceThread=new Thread(faceRequest); Thread onlineThread=new Thread(onlineRequest);
22.创建1个ReportProcessor对象,然后创建1个线程Thread对象来执行它。
ReportProcessor processor=new ReportProcessor(service); Thread senderThread=new Thread(processor);
23.启动这3个线程。
System.out.printf("Main: Starting the Threads/n"); faceThread.start(); onlineThread.start(); senderThread.start();
24.等待ReportRequest线程的结束。
try { System.out.printf("Main: Waiting for the report generators./n"); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { e.printStackTrace(); }
25.调用shutdown() 方法结束执行器,然后调用awaitTermination()方法等待所有的任务执行结束。
System.out.printf("Main: Shutting down the executor./n"); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
26.调用 setEnd() 方法,设置 end 属性为 true,来结束 ReportSender 的执行。
processor.setEnd(true); System.out.println("Main: Ends");
工作原理
在范例的主类中,我们调用Executors工厂类的newCachedThreadPool()方法创建了 ThreadPoolExecutor执行器对象。然后,使用这个对象初始化了CompletionService对象,因为完成服务(Completion Service)使用执行器来执行任务。然后,调用ReportRequest 类中的submit()方法,利用“完成服务”来执行任务。
当“完成服务”任务结束,这些任务中一个任务就执行结束了,“完成服务”中存储着Future对象,用来控制它在队列中的执行。
调用poll()方法访问这个队列,查看是否有任务已经完成,如果有,则返回队列中的第一个元素(即一个任务执行完成后的Future对象)。当poll()方法返回Future对象后,它将从队列中删除这个Future对象。在这个示例中,我们在调用poll()方法时传递了两个参数,表示当队列里完成任务的结果为空时,想要等待任务执行结束的时间。
一旦创建了CompletionService对象,还要创建两个ReportRequest对象,用来执行在CompletionService中的两个ReportGenerator任务。ReportProcessor任务则将处理两个被发送到执行器里的ReportRequest 任务所产生的结果。
更多信息
CompletionService类可以执行Callable或Runnable类型的任务。在这个范例中,使用的是Callable类型的任务,但是,也可以发送Runnable对象给它。由于Runnable对象不能产生结果,因此CompletionService的基本原则不适用于此。
CompletionService类提供了其他两种方法来获取任务已经完成的Future对象。这些方法如下。
- poll():无参数的poll()方法用于检查队列中是否有Future对象。如果队列为空,则立即返回null。否则,它将返回队列中的第一个元素,并移除这个元素。
- take():这个方法也没有参数,它检查队列中是否有Future对象。如果队列为空,它将阻塞线程直到队列中有可用的元素。如果队列中有元素,它将返回队列中的第一个元素,并移除这个元素。
参见
- 参见4.4节。
4.12 处理在执行器中被拒绝的任务
当我们想结束执行器的执行时,调用 shutdown() 方法来表示执行器应当结束。但是,执行器只有等待正在运行的任务或者等待执行的任务结束后,才能真正结束。
如果在shutdown()方法与执行器结束之间发送一个任务给执行器,这个任务会被拒绝,因为这个时间段执行器已不再接受任务了。ThreadPoolExecutor类提供了一套机制,当任务被拒绝时调用这套机制来处理它们。
在本节,我们将学习如何处理执行器中被拒绝的任务,这些任务实现了RejectedExecutionHandler 接口。
准备工作
本节的范例是在Eclipse IDE里完成的。无论你使用Eclipse还是其他的IDE(比如NetBeans),都可以打开这个IDE并且创建一个新的Java工程。
范例实现
按照接下来的步骤实现本节的范例。
1.创建一个名为 RejectedTaskController 的类,并实现 RejectedExecutionHandler 接口,然后实现接口的 rejectedExecution() 方法,在控制台输出已被拒绝的任务的名称和执行器的状态。
public class RejectedTaskController implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.printf("RejectedTaskController: The task %s has been rejected/n",r.toString()); System.out.printf("RejectedTaskController: %s/n",executor. toString()); System.out.printf("RejectedTaskController: Terminating: %s/n",executor.isTerminating()); System.out.printf("RejectedTaksController: Terminated: %s/n",executor.isTerminated()); }
2.创建一个名为Task的类,并实现Runnable接口。
public class Task implements Runnable{
3.声明一个名为name的私有String属性,用来存储任务的名称。
private String name;
4.实现类的构造器,用来初始化类的属性。
public Task(String name){ this.name=name; }
5.实现run()方法。在控制台输出信息表示方法开始执行。
@Override public void run() { System.out.println("Task "+name+": Starting");
6.让线程休眠一段随机时间。
try { long duration=(long)(Math.random()*10); System.out.printf("Task %s: ReportGenerator: Generating a report during %d seconds/n",name,duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); }
7.在控制台输出信息表示方法执行结束。
System.out.printf("Task %s: Ending/n",name); }
8.覆盖toString()方法,返回任务的名称。
public String toString() { return name; }
9.实现范例的主类,创建 Main 主类,并实现 main() 方法。
public class Main { public static void main(String[] args) {
10.创建 RejectedTaskController 对象来管理被拒绝的任务。
RejectecTaskController controller=new RejectecTaskController();
11.调用 Executors 工厂类的 newCachedThreadPool() 方法创建 ThreadPoolExecutor 执行器对象。
ThreadPoolExecutor executor=(ThreadPoolExecutor) Executors. newCachedThreadPool();
12.设置用于被拒绝的任务的处理程序。
executor.setRejectedExecutionHandler(controller);
13.创建 3 个任务并发送给执行器。
System.out.printf("Main: Starting./n"); for (int i=0; i<3; i++) { Task task=new Task("Task"+i); executor.submit(task); }
14.调用 shutdown() 方法关闭执行器。
System.out.printf("Main: Shutting down the Executor./n"); executor.shutdown();
15.创建另一个任务并发送给执行器。
System.out.printf("Main: Sending another Task./n"); Task task=new Task("RejectedTask"); executor.submit(task);
16.在控制台输出信息表示程序结束。
System.out.println("Main: End"); System.out.printf("Main: End./n");
工作原理
通过下面的截图,可以看到范例运行的结果。
我们可以看到被拒绝的任务,当执行已经关闭,RejectecedTaskController 在控制台输出任务和执行器的信息。
为了处理在执行器中被拒绝的任务,需要创建一个实现RejectedExecutionHandler接口的处理类。这个接口有一个rejectedExecution()方法,其中有两个参数:
- 一个Runnable对象,用来存储被拒绝的任务;
- 一个Executor对象,用来存储任务被拒绝的执行器。
被执行器拒绝的每一个任务都将调用这个方法。需要先调用Executor类的 setRejectedExecutionHandler()方法来设置用于被拒绝的任务的处理程序。
更多信息
当执行器接收一个任务并开始执行时,它先检查shutdown()方法是否已经被调用了。如果是,那么执行器就拒绝这个任务。首先,执行器会寻找通过setRejectedExecutionHandler()方法设置的用于被拒绝的任务的处理程序,如果找到一个处理程序,执行器就调用其rejectedExecution()方法;否则就抛出 RejectedExecutionExeption异常。这是一个运行时异常,因此并不需要catch语句来对其进行处理。
参见
- 参见4.2节。
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/140753.html