本文是《JAVA多线程编程实战指南》的样章,感谢作者授权并发网(ifeve.com)发表此文。感谢demochen整理此文。
5.1Two-phase Termination模式简介
停止线程是一个目标简单而实现却不那么简单的任务。首先,Java没有提供直接的API用于停止线程。此外,停止线程还有一些额外的细节需要考虑,如停止的线程处于阻塞(如等待锁)或者等待状态(等待其他线程),尚有未处理完的任务等。
Two-phase Termination模式通过将停止线程这个动作分解为准备阶段和执行阶段这两个阶段,提供了一种通用的用于优雅地停止线程的方法。
准备阶段。该阶段的主要动作是“通知”目标线程(欲停止的线程)准备进行停止。这一步会设置一个标志变量用于指示目标线程可与准备停止了。但是,由于目标线程可能正处于阻塞状态(等待锁的获得)、等待状态(如调用Object.wait)或者I/O(如InputStream.read)等待等状态,即便设置了这个标志,目标线程也无法立即”看到”这个标志而做出相应的动作。因此,这一阶段还需要用过interrupt方法,以期望目标线程能够对能够通过捕获相关的异常侦测到该方法调用,从而中断其阻塞状态、等待状态。对于能够对interrupt方法调用做出响应的方法(参见表5-1),目标线程代码可以通过捕获这些方法抛出的InterruptedException来侦测线程停止信号。但也有一些方法(如InputStream.read)并不对interrupt调用作出响应,此时需要我们手工处理,如同步的Socket I/O操作中通过关闭socket,使处于I/O等待的socket抛出java.net.SocketException。
表5-1.能够对Thread.interrupt作出响应的一些方法
方法(或者类) | 响应interrupt调用抛出的异常 |
---|---|
Object.wait() ǃObject.wait(long timeout) ǃObject.wait (long timeout, int nanos) |
InterruptedException |
Thread.sleep(long millis) ǃThread.sleep(long millis, int nanos) |
InterruptedException |
Thread.join()ǃThread.join(long millis) ǃThread.Join (long millis, int nanos) |
InterruptedException |
java.util.concurrent.BlockingQueue.take() | InterruptedException |
java.util.concurrent.locks.Lock.lockInterruptibly() | InterruptedException |
java.nio.channels.InterruptibleChannel | java.nio.channels.ClosedByInterruptException |
执行阶段。该阶段的主要动作是检查准备阶段所设置的线程停止标志和信号,在此基础上决定线程停止的时机,并进行适当的”清理”操作。
5.2Two-phase Termination 模式的架构
Two-phase Termination模式的主要参与者有以下几种。其类图如图5-1所示
ThreadOwner:目标线程的拥有者.Java语言中,并没有线程拥有者的概念,但是线程的背后是其要处理的任务或者其所提供的服务,因此我们不能在不清楚某个线程具体是做什么的情况下贸然将其停止。一般地,我们可以将目标线程的创建者视为该线程的拥有者,并假定其”知道”目标线程的工作内容,可以安全地停止目标线程。
Terminatable:可停止线程的抽象。其主要方法及职责如下
terminate:请求目标线程停止。
AbstractTerminatableThread:可停止的线程。其主要方法及职责如下
terminate:设置线程停止标志,并发送停止”信号”给目标线程。
doTerminate:留给自雷实现线程停止时所需的一些额外操作,如目标线程代码中包含SockerI/O,子类可以在该方法中关闭Socket以达到快速停止线程,而不会使目标线程等待I/O完成才能侦测到线程停止标记。
doRun:线程处理逻辑方法。留给子类实现线程的处理逻辑。相当于Thread.run(),只不过该方法中无需关心停止线程的逻辑,因为这个逻辑已经被封装在TerminatableThread的run方法中了。
doCleanup:留给子类实现线程停止后可能需要的一些清理动作。
TerminationToken:线程停止标志。toShutdown用于目标线程可以停止了。reservations可用于反映目标线程还有多少数量未完成的任务,以支持等目标线程处理完其他任务后再行停止。
ConcreteTerminatableThread:由应用自己实现的AbstractTerminatableThread参与者的实现类。该类需要实现其父类的doRun抽象方法,在其中实现线程的处理逻辑,并根据应用的实际需要覆盖(Override)其父类的doTerminate方法、doCleanup方法。
准备阶段的序列图如图5-2所示。
第1步:客户端代码调用线程拥有者的shutdown方法。
第2步:shutdown方法调用目标线程的terminate方法。
第3,4步:terminate方法将terminationToken的toShutdown标志设置为true。
第5步:terminate方法调用由AbstractTerminatableThread子类实现的doTerminate的方法,使得子类可以为停止目标线程做一些其他必要的操作。
第6步:若terminationToken的reservations属性值为0,则表示目标线程没有未处理完的任务或者ThreadOwner在停止线程时不关心其是否有未处理的任务。此时,terminate方法会调用目标线程的interrupt方法。
第7步:terminate方法调用结束
第8步:shutdown调用返回,此时目标线程可能还仍然在运行。
执行阶段由目标线程的run方法去检查terminationToken的toShutdown属性、reservations属性的值,并捕获由interrupt方法调用抛出的相关异常以决定是否停止线程。在线程停止前由AbstractTerminatableThread子类实现的doCleanup方法会被调用。
5.3Two-phase Termination模式实战案例解析
某系统的告警功能被封装在一个模块中。告警模块的入口类是AlarmMgr。其他模块(业务模块)需要发送告警信息时只需要调用AlarmMgr的sendAlarm方法即可。该方法将告警信息缓存如队列,由专门的告警发送线程负责调用AlarmAgent的相关方法发送告警。AlarmAgent类负责与告警服务器对接,它通过网络连接将告警信息发送至告警服务器。
告警发送线程是一个用户线程(user Thread),因此在系统的停止过程中,该线程若未停止则会组织JVM正常关闭。所以,在系统停止过程中我们必须主动去停止告警发送线程,而非依赖JVM。为了能够尽快地以优雅的方式将告警线程停止,我们需要处理以下两个问题。
1.当告警缓存队列非空时,需要将队列中已有的告警信息发送至告警服务器。
2.由于缓存告警信息的队列是一个阻塞队列(ArrayBlockingQueue),在该队列为空的情况下,告警发送线程会一直处于等待状态。这会导致其无法响应我们关闭线程的请求。
上述问题可以通过使用Two-phase Termination模式来解决。
AlarmMgr相当于图5-1中的ThreadOwner参与者实例,它是告警发送线程(对应实例变量alarmSendingThread)的拥有者。系统停止过程中调用其shutdown方法(AlarmMgr.getInstance().shutdown())即可请求告警发送线程停止。其代码如清单5-1所示
清单5-1.AlarmMgr类源码
/** * 告警功能入口类 模式角色: Two-phaseTermination.ThreadOwner */ public class AlarmMgr { // 保存AlarMgr类的唯一实例 private static final AlarmMgr INSTANCE = new AlarmMgr(); private volatile boolean shutdownRequested = false; // 告警发送线程 private final AlarmSendingThread alarmSendingThread; // 私有构造器 private AlarmMgr() { alarmSendingThread = new AlarmSendingThread(); } public static AlarmMgr getInstance() { return INSTANCE; } /** * 发送告警 * * @Description: TODO(这里用一句话描述这个方法的作用) * @param type告警类型 * @param id告警编号 * @param extraInfo告警参数 * @return 由type+id+extraInfo唯一确定的告警信息提交的次数。-1表示告警管理器已被关闭 */ public int sendAlarm(AlarmType type, String id, String extraInfo) { Debug.info("Trigger alarm " + type + "," + id + ',' + extraInfo); int duplicateSubmissionCount = 0; try { AlarmInfo alarmInfo = new AlarmInfo(id, type); alarmInfo.setExtraInfo(extraInfo); duplicateSubmissionCount = alarmSendingThread.sendAlarm(alarmInfo); } catch (Throwable t) { t.printStackTrace(); } return duplicateSubmissionCount; } public void init() { alarmSendingThread.start(); } public synchronized void shutdown() { if (shutdownRequested) { throw new IllegalStateException("shutdown already requested!"); } alarmSendingThread.terminate(); shutdownRequested = true; } }
告警发送线程类AlarmSendingThread 的源码,如清单5-2所示。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; //模式角色:Two-phaseTermination.ConcreteTerminatableThread public class AlarmSendingThread extends AbstractTerminatableThread { private final AlarmAgent alarmAgent = new AlarmAgent(); //告警队列 private final BlockingQueue alarmQueues; private final ConcurrentMap<String, AtomicInteger>; submittedAlarmRegistry; public AlarmSending alarmQueue=new ArThread(){ //rayBlockingQueue(100); submittedAlarmRegistry = new ConcurrentHashMap&lt;String, AtomicInteger&gt;(); alarmAgent.init(); } @Override protected void doRun() throws Exception { AlarmInfo alarm; alarm = alarmQueue.take(); terminationToken.reservations.decrementAndGet(); try { //将告警信息发送至告警服务器 alarmAgent.sendAlarm(alarm); } catch (Exception e) { e.printStackTrace(); } if (AlarmType.RESUME == alarm.type) { String key = AlarmType.FAULT.toString() + ':' + alarm.getId() + '@' + alarm.getExtraInfo(); submittedAlarmRegistry.remove(key); key = AlarmType.RESUME.toString() + ':' + alarm.getId() + '@' + alarm.getExtraInfo(); submittedAlarmRegistry.remove(key); } } public int sendAlarm(final AlarmInfo alarmInfo){ AlarmType type = alarmInfo.type; String id = alarmInfo.getId(); String extraInfo = alarmInfo.getExtraInfo(); if (terminationToken.isToShutdown()) { System.err.println(&quot;rejected alarm:&quot; + id + &quot;,&quot; + extraInfo); return -1; } int duplicateSubmissionCount = 0; try { AtomicInteger prevSubmittedCounter; prevSubmittedCounter = submittedAlarmRegistry.putIfAbsent( type.toString()+ ':' + id + '@' + extraInfo, new AtomicInteger(0)); if (null == prevSubmittedCounter) { terminationToken.reservations.incrementAndGet(); alarmQueue.put(alarmInfo); }else{ //故障未恢复,不用重复发送告警信息个服务器,故仅增加计数 duplicateSubmissionCount = prevSubmittedCounter.incrementAndGet(); } } catch (Throwable t) { t.printStackTrace(); } return duplicateSubmissionCount; } @Override protected void doCleanup(Exception exp) { if (null != exp &amp;&amp; !(exp instanceof InterruptedException)) { exp.printStackTrace(); } alarmAgent.disconnect(); } }
从上面的代码可以看出,AlarmSendingThread每接受一个告警信息放入缓存队列便将terminationToken 的reservations 值加1,而每发送一个告警到告警服务器则将terminationToken 的reservations值减少1.这为我们可以在停止告警发送线程前确保队列中现有的告警信息会被处理完毕提供了线索:AbstractTerminatableThread的run方法会根据terminationToken 的reservations 是否为0来判断待停止的线程已无未处理的任务,或者无需关系起是否有待处理的任务。
AbstractTerminatableThread 的源码见清单5-3
/** * 可停止的抽象线程 模式角色:Two-phaseTermination.AbstractTerminatableThread * * @author Viscent Huang * @date 2015年12月5日 下午8:38:17 */ public abstract class AbstractTerminatableThread extends Thread implements Terminatable { // 模式角色:Two-phaseTermination.TerminationToken public final TerminationToken terminationToken; public AbstractTerminatableThread() { this(new TerminationToken()); } /** * @param terminationToken 线程间共享的线程终止标志实例 */ public AbstractTerminatableThread(TerminationToken terminationToken) { super(); this.terminationToken = terminationToken; terminationToken.register(this); } /** * 留给子类实现其线程处理逻辑 * * @throws Exception */ protected abstract void doRun() throws Exception; /** * 留给子类实现。用于实现线程停止后的一些清理动作 * * @param cause 设定文件 */ protected void doCleanup(Exception cause) { // 什么也不做 } @Override public void run() { Exception ex = null; try { for (;;) { if (terminationToken.isToShutdown && terminationToken.reservations.get() <= 0) { break; } doRun(); } } catch (Exception e) { // 使得线程能够响应interrupt调用而退出 ex = e; } finally { try { doCleanup(ex); } finally { terminationToken.notifyThreadTermination(this); } } } @Override public void interrupt() { terminate(); } /** * 请求停止线程 * * @see io.github.viscent.mtpattern.tpt.Terminatable#terminate() */ @Override public void terminate() { terminationToken.setToShutdown(true); try { doTerminiate(); } finally { // 若无待处理的任务,则试图强制终止线程 if (terminationToken.reservations.get() <= 0) { super.interrupt(); } } } public void terminate(boolean waitUtilThreadTerminated) { terminate(); if (waitUtilThreadTerminated) { try { this.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
AbstractTerminatableThread是一个可复用的Terminatable 参与者实例。其terminate 方法完成了线程停止的准备阶段。该方法首先将terminationToken 的toShutdown 属性设置为true,指示目标线程可以准备停止了。但是,此时目标线程可能处于一些阻塞(Blocking)方法的调用,如 Object.sleep、ǃInputStream.read 等,无法检测该变量的值。调用目标线程的interrupt 方法可以使一些阻塞方法(参见表5-1)抛出异常从而是目标线程停止。但也有些阻塞方法如InputStream.read 并不对interrupt 方法调用作出响应,此时需要由AbstractTerminatableThread 的子类实现doTerminate 方法,在该方法中实现一些关闭目标线程所需的额外操作。例如,在Socket同步I/O中通过关闭socket使得使用该socket 的线程若处于I/O等待会抛出SocketException 。因此,terminate方法下一步调用doTerminate 方法。接着,若terminationToken.reservations 的值作为非正数(表示目标线程无待处理任务或者我们不关心其是否有待处理任务),则terminate 方法会调用目标线程的interrupt 方法,强制目标线程的阻塞方法中断,从而强制终止目标线程。
执行阶段在AbstractTerminatableThread 的run方法中完成。该方法通过对TerminationToken 的toShutdown 属性和reservations 属性的判断或者通过捕获由interrupt 方法调用而抛出的异常来终止线程,并在线程终止前调用由AbstractTerminatableThread 子类实现的doCleanup 方法用于执行一些清理动作。
在执行阶段。由于AbstractTerminatableThread.run方法每次执行现场处理逻辑(通过调用doRun方法实现)前都先判断下toShutDown属性和reservations 的值,在目标线程处理完待处理的任务后(此时reservations 属性的值为非整数)目标线程run方法也就退出了while循环。因此线程的处理逻辑方法将不再被调用,从而使本案例在不使用Two-phase Termination模式的情况下停止目标线程存在的两个问题得以解决(目标线程停止前可以保证处理完待处理的任务-发送队列中现有的告警信息到服务器)和规避(目标线程发送完队列中现有的告警信息后,doRun方法不再被调用,从而避免了队列为空时 BlockingQueue.take 调用导致的阻塞)。
由上可知,准备阶段、执行阶段需要通过TerminationToken 作为“中介”来协调二者的动作。TerminationToken 的源码如清单5-4所示。
清单5-4 4 TerminationToken 类源码
/** * 线程停止标志 * * @author Viscent Huang */ public class TerminationToken { // 使用volatile修饰,以保证无须显示锁的情况下该变量的内存可见性 protected volatile boolean toShutdown = false; public final AtomicInteger reservations = new AtomicInteger(0); /*** * 在多个可停止线程实例共享一个TerminationToken实例的情况下, * 该队列用于记录那些共享TerminationToken 实例的可停止线程, * 以便尽可能减少锁的使用的情况下,实现这些线程的停止 */ private final Queue<WeakReference> coordinatedThreads; public TerminationToken() { coordinatedThreads = new ConcurrentLinkedQueue<WeakReference>(); } public boolean isToShutdown() { return toShutdown; } protected void setToShutdown(boolean toShutdown) { this.toShutdown = true; } protected void register(Terminatable thread) { coordinatedThreads.add(new WeakReference(thread)); } /** * 通知TerminationToken 实例: * 共享该实例的所有可停止线程中的一个线程停止了 以便其停止其他未被停止的线程 * * @param thread 已停止的线程 */ protected void notifyThreadTermination(Terminatable thread) { WeakReference wrThread; Terminatable otherThread; while (null != (wrThread = coordinatedThreads.poll())) { otherThread = wrThread.get(); if (null != otherThread && otherThread != thread) { otherThread.terminate(); } } } }
5.4 Two-phase Termination 模式的评价与实现考量
Two-phase Termination 模式使得我们可以对各种形式的目标线程进行优雅的停止。如目标线程调用了能够对interrupt 方法调用作出响应的阻塞方法、目标线程调用了不能对interrupt 方法调用作出响应的阻塞方法、目标线程作为消费者处理其他线程生产的“产品”在其停止前需要处理完现有“产品”等。Two-phase Termination 模式实现的线程停止可能出现延迟,即客户端代码调用完ThreadOwner.shutdown 后,该线程可能仍在运行。
本章案例展示了一个可复用的Two-phase Termination 模式实现代码。读者若是要加深对该模式的理解或者自行实现该模式,需要注意以下几个问题。
5.4.1线程停止标志
本章案例使用了TerminationToken 作为目标线程可以准备停止的标志。从清单5-4的代码我们可以看到,TerminationToken 使用了toShutdown 这个boolean 变量作为注意的停止标志,而非使用Thread.isInterrupted()。这是因为,调用目标线程的interrupt方法无法保证目标线程的 isInterrupted() 方法返回值true:目标线程可能调用一些代码,它们捕获InterruptedException 后没有通过调用Thread.currentThread().interrupt()保留线程中断状态。另外,toShutdown 这个变量为了保证内存可见性而又能避免使用显示锁的开销,采用了volatile修饰。这点也很重要,笔者曾经见过一些采用boolean变量作为线程停止标志的代码,只是这些变量没有用volatile修饰,对其访问也没有锁,这就可能无法停止目标线程。
另外,某些场景下多个可停止线程实例可能需要共用一个线程停止标志。例如,多个可停止线程实例“消耗”同一个队列中的数据。当该队列为空且不再有新的数据入队列的时候,”消耗”该队列数据的所有可停止线程都应该被停掉。AbstractTerminatableThread类(源码见清单5-3)的构造器支持传入一个TerminationToken 实例就是为了支持这种场景。
5.4.2生产者-消费者问题中的线程停止
在多线程编程中,许多问题和一些多线程编程模式都可以看做生产者-消费者问题。停止处于生产者-消费者问题中的线程,需要考虑更多的问题:需要注意线程的停止顺序。如果消费者线程比生产者线程先停止则会导致生产者生产新的”产品”无法被处理,而如果先停止生产者线程有可能使消费者线程处于空等待(如生产者、消费者采用阻塞队列中转”产品”)。并且,停止消费者线程是否考虑要等待其处理完所有待处理的任务或者将这些任务做个备份也是个问题。本章案例部分地展示生产者-消费者问题中线程停止的处理,其核心就是通过使用TerminationToken 的reservations 属性:生产者每”生产”一个产品,Two-phase Termination 模式的客户端代码要使reservations 属性值增加1(即调用terminationToken.reservations.incrementAndGet());消费者线程每处理一个产品,该线程的线程处理逻辑方法 doRun 要使reservations 属性值减少1(即调用terminationToken.reservations.decrementAndGet())。当然,在停止消费者线程时如果我们不关心其待处理的任务,Two-phase Termination模式的客户端代码可以忽略对reservations 变量的操作。清单5-5展示了一个完整的停止生产者-消费者问题中的线程例子。
清单5-5 停止生产者-消费者问题中的线程的例子
public class SomeService { private final BlockingQueue queue = new ArrayBlockingQueue(100); private final Producer producer = new Producer(); private final Consumer consumer = new Consumer(); private class Producer extends AbstractTerminatableThread { private int i = 0; @Override protected void doRun() throws Exception { queue.put(String.valueOf(i++)); consumer.terminationToken.reservations.incrementAndGet(); } }; private class Consumer extends AbstractTerminatableThread { @Override protected void doRun() throws Exception { String product = queue.take(); System.out.println("Processing product:" + product); // 模拟执行真正操作的时间消耗 try { Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { ; } finally { terminationToken.reservations.decrementAndGet(); } } } public void shutdown() { // 生产者线程停止后再停止消费者线程 producer.terminate(true); consumer.terminate(); } public static void main(String[] args) throws InterruptedException { SomeService ss = new SomeService(); ss.init(); Thread.sleep(500); ss.shutdown(); } public void init() { producer.start(); consumer.start(); } public static void main(String[] args) throws InterruptedException { SomeService ss = new SomeService(); ss.init(); Thread.sleep(500); ss.shutdown(); } }
5.4.3 隐藏而非暴露可停止的线程
为了保证可停止的线程不被其他代码误止,一般我们将可停止的线程隐藏在线程拥有者背后,而 使系统中其他代码无法直接访问该线程,正如本案例代码(见清单5-1)所展示:AlarmMgr定义了一个private字段alarmSendingThread用于引用告警发送线程(可停止的线程),系统中的其他代码只能通过调用AlarmMgr 的shutdown 方法来请求该线程停止,而非通过使用该线程对象来停止它。
5.5Two-phase Termination 模式的可复用实现代码
本章案例代码(见清单5-3、清单5-4)所实现的 Two-phase Termination 模式的几个参与者AbstractTerminatableThread 和TerminationToken 都是可复用的。在此基础上,应用代码只需要在定义AbstractTerminatableThread 的子类(或匿名类)时实现doRun方法,在该方法中实现线程的处理逻辑。另外,应用代码如果需要在目标线程处理完完待处理的的任务后再停止,则需要注意TerminationToken 实例的reservations 属性值的增加和减少。
5.6 Java标准库实例
类java.util.concurrent.ThreadPoolExecutor就使用了˹ Two-phase Termination 模式来停止其内部维护的工作者线程。当客户端代码调用ThreadPoolExecutor 实例的shutdown方法请求其关闭时,ThreadPoolExecutor 会先将其运行状态设置为 SHUTDOWN。工作者线程的run方法会判断其所属的ThreadPoolExecutor 实例的运行状态。若ThreadPoolExecutor 实例的运行状态为SHUTDOWN,则工作者线程会一直取工作队列中的任务进行执行,知道工作队列为空时该工作者线程就停止了。可见,ThreadPoolExecutor 实例的停止过程也是分为准备阶段(设置其运行状态为SHUTDOWN)和执行阶段(工作者队列取空工作队列中的任务,然后终止线程)。
5.7 相关模式
Two-phase Termination 模式是一个应用比较广泛的基础多线程设计模式。凡是涉及应用自身实现线程的代码,都可能需要使用该模式。
5.7.1 Producer-Consumer 模式(第7章)
Producer-Consumer 模式中,生产者线程、消费者线程的停止可能需要使用 Two-phase Termination模式。
5.8 参考资源
1.Brian Göetz et al.Java Concurrency In Practice.Addison Wesley,2006.
2.Mark Grand. Patterns in Java, Volume 1: A Catalog of Reusable Design Patterns Illustrated with UML, Second Edition.Wiley, 2002.
3.类 ThreadPoolExecutor 源码. http://www.docjar.com/html/api/java/util/concurrent/ Thread PoolExecutor.java.html.
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/117596.html