之前在研究AsyncTask源代码的时候发现了它的内部使用了FutureTask、Future,Callable类来实现,因为之前在学习Java的时候并没有接触到这些东西,于是乎就打开了百度看了半天别人的博客也没有理解其用法以及原理,后来果断的查看了一下其源代码之后才知道其来龙去脉。官方文档这么介绍FutureTask类的。
A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset()).
翻译:
这是一个可以取消的异步计算,该类提供了Future的基本实现,具有启动和取消运算,查询运算是否结束,并且检查返回计算的结果,该结果只能在运行完成之后才能获取到,如果程序没有运行结束,则`get()`将会阻塞。程序运行结束之后,无法重新启动或者是取消程序(除非调用`runAndReset`方法)
也就是说FutureTask也是一个用来执行异步任务的类,同时当程序执行完成之后还会返回运算的结果。我们之前也学过了使用Thread+Runnable来执行异步任务的,但是使用这种方式不能获取到执行的结果而已。下面我们就来看看里面具体的原理。
用法
我们就使用该类来实现一个模拟一个非常简单的事情,使用Thread.sleep()来模拟执行耗时的操作工作,然后将执行完成的结果返回出来。然后打印出来:
public class FutureTaskActivity extends Activity implements View.OnClickListener { ......... //创建一个实现Callable接口的并且在 call()方法中做耗时的操作 class WorkTask implements Callable<Integer> { @Override public Integer call() throws Exception { //我们这里通过使用线程 sleep来模拟耗时的操作,以后我们所有的耗时操作都在该方法里面执行了 Thread.sleep(5000); //将执行的结果返回出去 return 1000; } } ......... private void executeTask() { //创建一个worktask,并且当作参数传入到FutureTask中。 WorkTask workTask = new WorkTask(); FutureTask<Integer> futureTask = new FutureTask<Integer>(workTask) { @Override protected void done() { try { //该方法回调意思线程运行结束回调的,然后获取call方法中返回的结果 int result = get(); Log.i("LOH", "result..." + result); Thread.currentThread().getName(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }; //将FutureTask作为参数传入到Thread函数中执行。 Thread thread = new Thread(futureTask); //启动线程执行任务 thread.start(); } }
从上面的代码中我们可以看到其实它的使用跟Runnable的使用差不多的,只是多出了一个用来实现Callable接口的类作为参数传入到 FutureTask中,并且以前的那种方法不能监听线程执行完成的。以前我们在使用线程的时候都知道只有三种方式来执行异步任务:
- 使用new Thread(new Runnable()).start()的方法来执行异步任务,也就是说将实现Runnable接口的类当作参数传入到Thread 类中,然后使用thread.start来启动线程执行
- 通过继承Thread类并且重写该类的 run 方法,在该方法中执行耗时的操作,同样也是使用 Thread.start()来启动线程执行
- 使用线程池来执行实现了 Runnable 接口的类,这样子也可以达到执行异步任务的目的。
通过对上面的这些总结我们可以知道了要想实现异步任务的话就必须实现 Runnable() 接口才行的,所以我们也可以非常肯定的断定FutureTask也是实现了该接口的。不然就无法放到执行异步任务的,通过对上面的一个简单的介绍我们知道了如何使用它,下面就来看看里面到底是一个什么样的机制。
源码分析
首先我们通过一个类的关系图来看看这几个类之间的关系图
从图中我们可以一目了然的看到FutureTask实现了RunnableFuture接口,但是RunnableFuture也实现了Runnable接口和Future接口,所以FutureTask可以当作任务在线程池中执行,也可以当作参数传入Thread中进行启动任务,在创建FutureTask对象的时候需要传入一个Callable接口的实现类,从上面的中我们可以看到执行FutureTask任务同样也是在 run方法中的。
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); private static final long STATE; private static final long RUNNER; private static final long WAITERS; static { try { //通过Unsafe来获取字段state相对于本对象内存地址的偏移地址 STATE = U.objectFieldOffset (FutureTask.class.getDeclaredField("state")); //获取字段runner在内存中的偏移地址 RUNNER = U.objectFieldOffset (FutureTask.class.getDeclaredField("runner")); //获取字段waiters在内存中的偏移地址 WAITERS = U.objectFieldOffset (FutureTask.class.getDeclaredField("waiters")); } catch (ReflectiveOperationException e) { throw new Error(e); } Class<?> ensureLoaded = LockSupport.class; }
在创建FutureTask对象之前,有一个非常重类 sun.misc.Unsafe 该类的介绍我们这里也不做介绍,在我的另外一篇博客中会有介绍的。静态代码块中的功能主要是获取该对象的字段在内存中的偏移地址,获取这些偏移地址的作用是为了直接操作内存中某个变量的变量值做准备的。我们在学习C或者是C++的时候知道,如果我们知道了某个对象的某个字段的内存地址话,那我们就可以直接通过地址的方式来更新该字段的内存值了。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
public class FutureTask<V> implements RunnableFuture<V> { ....... public void run() { /** * 如果当前的线程的状态不是新创建的话就返回 * 第三个条件判断是 安全检查具体没有找到源代码,我们先不用管 */ if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { //将我们传进来的callable对象赋值给一个临时变量 Callable<V> c = callable; //判断传入进来的callable对象不为空并且线程状态也是新建的 if (c != null && state == NEW) { V result; boolean ran; try { /** * 原来我们总是说的call()方法原来是在run方法中执行的 * 然后call()返回一个泛型类型的返回值 ,这种通过实现接口的方法在我们平时中是很常见的吧. */ result = c.call(); ran = true; /** * 该地方作者考虑的很清楚,在定义call方法的时候抛出异常,然后这里捕捉异常进行 * 处理,因为我们在call方法中写代码难免会有异常问题的。 */ } catch (Throwable ex) { result = null; ran = false; setException(ex); } //如果call方法中不跑出异常的话,则通过set()方法将结果保存起来 //该set()方法其实也是Future接口定义的方法 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //如果当前的状态不是 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } ......... }
当线程执行完成之后(没有异常)就会调用set方法,根据上面的代码我们其实也没有什么好看的,call()方法就是在run()方法中执行的。
protected void set(V v) { if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) { //最后将结果赋值给成员变量 outcome outcome = v; //同时更新state状态为NORMAL,这是这种更新方式是直接根据内存地址来修改内存值的 U.putOrderedInt(this, STATE, NORMAL); // final state finishCompletion(); } }
boolean compareAndSwapInt() 方法是Unsafe中的本地方法,主要作用是用于在多线程中并发的修改和读取某个值,其主要原理:根据传入的期望的数据跟内存中的数据进行对比,如果期望的数据跟内存中的数据相同的话,说明该变量的值没有被其他的线程修改过,同时将我们需要更改的新数据替换内存中的数据,体会成功之后并且返回true,表示的是修改新数据成功了。相反如果有其他线程修改了内存中的则放弃更新新数据,并且返回true。它有三个参数:object 表示更改数据的对象;offset 表示对象上字段的偏移地址;expectedValue 表示期望的数据,该数据的作用是用于跟内存中的数据进行比较,如果两者不想等的话说明内存中的数据被其他线程修改过;newValue表示更新的值。最后如果更新某个内存地址的值成功的话,则返回true,否则返回false。这个也是我们平时用到的多线程并发的原理基石理论:CAS(Compare and Swap, 翻译成比较并交换)。
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //根据寻找内存地址的方式来修改属性在内存中的值,将该waiters对象置为null if (U.compareAndSwapObject(this, WAITERS, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //回调Future接口的方法,标识中程序正常的结束了,我们需要重写该方法. done(); //特别需要注意的是需要将该接口变量置为空,防止出现因为引用问题导致内粗泄漏 callable = null; // to reduce footprint, }
当执行到该方法的时候也标识的程序正常的运行结束了,首先会将所有等待的线程全部唤醒,因为在执行FutureTask任务的时候调用get()方法是阻塞的,因为call()方法都还没有执行完成,这个时候你是获取不到任何结果的,所以会将当前调用get()方法的线程阻塞等待,直到调用finishCompletion()方法来解除线程阻塞,最后调用done()方法,这个时候我们就可以在该结束方法中执行我们想要的逻辑了;从代码中我们可以看出done()方法其实也还是运行在子线程的,所以我们并不可以在done()方法中更新UI的,还是需要Handler来发送消息的。
在线程都全部执行结束之后,我们就可以在done()方法通过调用get()方法来获取最后执行的结果了,也就是刚刚在set()方法中看到的outcome的值。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
我们在之前在调用set()方法时通过寻址(根据内存地址的偏移量)的方式修改过了state的值为NORMAL了,所以NORMAL大于COMPLETING,最后直接调用report()方法,最后直接通过return x 来返回结果。这个也就是我们使用FutureTask的一个大概的流程。其实通过代码我们就能很容易的看出该类的大概的设计原理,同时还可以学到更多的其他技术。
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
我们通过调用Future接口的isDone()来判断程序是否结束,可以直接根据state的状态判断是否是新创建的,该类的线程有7中不同的状态,主要状态切换成其中的一种我们就可以说程序结束了。
private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; public boolean isDone() { return state != NEW; }
只要状态值大于CANCELLED(4),也就是用户主动调用cancel()方法,不管是主动中断线程还是其他的方式都属于取消的操作的。
public boolean isCancelled() { return state >= CANCELLED; }
当程序里面的FutureTask未执行完成的时候get()方法会一直阻塞调用该方法的线程,直到FutureTask里面的任务执行才会解除阻塞。所以get()方法是一个阻塞式的去获取结果的,从上面的get()方法的代码中我们可以得出当状态还是NEW的时候,会调用awaitDone(false ,0)方法。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - call nanoTime exactly once for each call to park // - if nanos <= 0L, return promptly without allocation or nanoTime // - if nanos == Long.MIN_VALUE, don't underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } else if (q == null) { if (timed && nanos <= 0L) return s; q = new WaitNode(); } else if (!queued) queued = U.compareAndSwapObject(this, WAITERS, q.next = waiters, q); else if (timed) { ........ } else LockSupport.park(this); } }
该方法有个无限循环知道状态值大于COMPLETING才返回一个状态值,我们在线程未执行完成的时候调用了get()方法,可以看到首先会创建一个WaitNode对象,然后通过Unsafe类来更新成员变量waiter的值为 q,然后再次循环最后会进入 LockSupport.park(this) 分支,该函数主要是获取许可阻塞当前的线程,直到程序执行结束之后,调用LockSupport.unpark(this)来释放阻塞。所以如果我们在主线程中直接调用get()方法来获取结果的话则很有可能导致ANR,直到程序结束之后才会释放阻塞的,正确的用法就是在done()方法里面调用get()来获取执行的结果的。关于LockSupport是一个非常重要的多线程并发的类,不懂的直接在百度上看看其解释。
我们平时在使用AsyncTask的时候有一个cancel()方法来取消当前执行的任务,我们之前也说了AsyncTask的本质其实也是使用了FutureTask来实现的。其实它的cancel()方法也是调用FutureTask的取消方法的,下面看看取消的原理:
//如果返回值为true的话表示取消成功,否则为取消失败了 public boolean cancel(boolean mayInterruptIfRunning) { //首先判断当前的状态是否是NEW,然后在通过Unsafe类去更新内存中state字段的值为cancel。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { //如果以上的状态值设置成功的话,则判断是否设置中断运行 if (mayInterruptIfRunning) { try { Thread t = runner; //直接通过调用Thread的中断方法来强制中断当前运行的线程 if (t != null) t.interrupt(); } finally { // final state //最后修改当前状态state的值为 INTERRUPTED 为中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //最后解锁所有被阻塞的线程 finishCompletion(); } return true; }
我们在取消任务的时候可以设置强制中断线程运行,只要调用cancel(true) 就行了,有时候我们调用cancel(false)并不能立刻的停止线程执行完成的,因为这个时候程序在run()方法中已经执行过了状态(state)值判断的话,这个时候就直接执行call()方法了,但是call() 方法也没有执行完成,如果这个时候我们去取消的话, 因为我们知道取消的原理就是使用Unsafe类去修改内存中的state的值,但是这个时候设置已经来不急了。
public void run() { //第一次线程状态判断 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //第二次线程判断,如果我们的设置了一般的取消操作比该判断滞后的话则是没有什么用的。 if (c != null && state == NEW) { ..... result = c.call(); ..... } } finally { ........ } }
虽然我们调用了cancel(false)方法去取消任务的,但是很多的时候还是不能马上终止任务执行,最后线程还是会继续执行的,但是到了set()方法的时候,这里会有一个状态值的判断的。之前我们已经介绍了线程并发的基石CAS,首先我们使用Unsafe类去比较state状态值是否发生了变化,如果state的值被其他的线程修改了,则不会调用done()方法了。
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { ........ } }
总结
从上面我们对源代码的分析的分析很明显的知道了 Callable、Future、FutureTask三者之间的关系了,也很明白的知道的如何更好的使用它们了,其实FutureTask根本就没有想象的这么难,我看网上把FutureTask说的神乎其神,不会抛出什么异常,然后可以返回获取结果等等各种各样的解释,由于没有例子和代码,最后还是不知道讲的是什么。最后发现不过就是一个接口函数在run()方法中执行,我们只需要实现Callable接口并且重写call()方法就可以了。不过通过看源代码可以使我们学到很多的东西。
本次有两个非常重要的东西很值得我们继续去研究,由于本人理解水平有限,博客里面的内容写的比较乱,有什么疑问的大家一起讨论和学习。
- Unsafe 类的使用以及作用
- LockSupport 作用以及使用
- CAS (Compare and Swap, 比较并交换),该原理是所有线程并发的原理,有时间可以深入了解下
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/3185.html