原文链接 作者:Tomasz Nurkiewicz 译者:simonwang
我之前写过一篇关于InterruptedException and interrupting threads的文章。总之,如果你调用Future.cancel(),那么Future不仅会终止正在等待的get(),还会试图去中断底层的线程。这是个很重要的特征,它能够使线程池变得更加利于使用。我在之前的文章中也说过,相对于标准的Future,尽量使用CompletableFuture。但事实证明,Future的更加强大的兄弟-CompletableFuture并不能优雅地处理cancel()。
请思考下面的任务代码,在接下来的测试中会用到:
class InterruptibleTask implements Runnable { private final CountDownLatch started = new CountDownLatch(1) private final CountDownLatch interrupted = new CountDownLatch(1) @Override void run() { started.countDown() try { Thread.sleep(10_000) } catch (InterruptedException ignored) { interrupted.countDown() } } void blockUntilStarted() { started.await() } void blockUntilInterrupted() { assert interrupted.await(1, TimeUnit.SECONDS) } }
客户端线程可以检查InterruptibleTask是否已经开始运行或者是被中断了。首先,我们可以从外部查看InterruptibleTask到底会对cancel()作出怎么样的反应:
def "Future is cancelled without exception"() { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() and: future.cancel(true) when: future.get() then: thrown(CancellationException) } def "CompletableFuture is cancelled via CancellationException"() { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() and: future.cancel(true) when: future.get() then: thrown(CancellationException) }
到目前为止一切顺利,Future和CompletableFuture都以几乎相同的方式工作着-在cancel之后取回结果会抛出CancellationException(这里要解释一下,Future.cancel()是不会抛出异常的,而CompletableFuture.cancel()则会以抛出CancellationException强行结束,上面的代码作者都手动抛出了CancellationException)。但在myThreadPool中的线程会怎样呢?我猜会被中断然后被线程池重新回收,我大错特错!
def "should cancel Future"() { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() when: future.cancel(true) then: task.blockUntilInterrupted() } @Ignore("Fails with CompletableFuture") def "should cancel CompletableFuture"() { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() when: future.cancel(true) then: task.blockUntilInterrupted() }
第一个测试提交普通的Runnable给ExecutorService然后等待直到它开始执行,接着我们取消Future等待直到抛出InterruptedException,当底层的线程被中断的时候blockUntilInterrupted()会返回。第二个测试失败了,CompletableFuture.cancel()不会中断线程,尽管Future看起来被取消了,但后台线程仍然在执行,sleep()不会抛出InterruptionException。这是一个bug还是这就是CompletableFuture的特点?你们可以查看此文档,不幸地是这就是它的特点:
Parameters: mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.
RTFM(Read The Fucking Manual),但为什么CompletableFuture会以这样的方式工作?首先让我们检查一下“老的”Future的实现与CompletableFuture的有什么不同。FutureTask会在执行ExecutorService.submit()之后返回,而且它的cancel()有如下的实现(我移除了Unsafe以及相似的非线程安全的Java代码,所以仅仅把它当作伪代码看待):
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state state = INTERRUPTED; } } } finally { finishCompletion(); } return true; }
FutureTask的state变量状态如下图:
万一执行cancel(),我们要么进入CANCELLED状态,要么通过INTERRUPTING进入INTERRUPTED。这里的核心部分是我们要获取runner线程(如果存在,例如如果task正在被执行)然后试着去中断它。这里要小心对于正在运行的线程的强制中断。最后在finishCompletion()中我们要通知所有阻塞在Future.get()的线程(这一步在这里无关痛痒可以忽略)。所以我们可以直观的看到老的Future是如何取消正在运行的tasks的。那CompletableFuture呢?它的cancel()伪代码如下:
public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = false; if (result == null) { result = new AltResult(new CancellationException()); cancelled = true; } postComplete(); return cancelled || isCancelled(); }
这相当令人失望,我们很少把result赋值为CancellationException而忽略mayInterruptIfRunning标志。postComplete()的作用和finishCompletion()的作用相似,通知所有注册在future下的正在等待的回调操作。这种实现相当让人不愉快(使用了非阻塞的Treiber stack),但它的确没有中断任何底层的线程。
Reasons and implications
CompletableFuture的这种cancel限制并不是bug,而是一种设计决定。CompletableFuture天生就没有和任何线程绑定在一起,但Future却几乎总是代表在后台运行的task。使用new关键字创造一个CompletableFuture(new CompletableFuture<>())就很好,这时没有任何底层的线程去取消。但是仍然有大部分的CompletableFuture和后台的task以及线程有联系,在这种情况下有问题的cancel()就是一个潜在的问题。我不建议盲目地用CompletableFuture替换Future,因为如果程序里面有cancel(),那么替换可能会改变程序的行为。这就意味着CompletableFuture有意地违背了里氏替换原则,我们要认真思考这样做的含义。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/114612.html