一、CountDownLatch介绍
CountDownLatch是在jdk1.5被引入的,它主要是通过一个计数器来实现的,当在初始化该类的构造函数时,会事先传入一个状态值,之后在执行await方法后,
在这个状态值为0之前,当前线程(指的是调用await的线程)会一直等待。它内部使用了AQS来实现的,且是共享锁,具体怎么实现,待会看看它的实现原理。
它的应用场景:
一般在于在执行当前线程之前,要完成n个线程的任务,才能执行当前线程。这种场景适合用countdownLatch。
二、源码解析
先来看看该类的构造,如下图
如图,红色框选中的是该类的一个内部类,该内部类实现了抽象类AQS,具体锁的获取和释放是由该内部类实现的。
由上图知countdownLatch只有一个构造函数,
1 public CountDownLatch(int count) { 2 if (count < 0) throw new IllegalArgumentException("count < 0"); 3 this.sync = new Sync(count); 4 }
很明显,它有一个参数,这个参数,被用在哪里呢,请看下面
1 Sync(int count) { 2 setState(count); 3 }
这个参数最终用在了状态值上,由此可猜测,这个状态值决定这锁什么时候释放。
1、内部类Sync
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count);//设置状态值的大小 } int getCount() { return getState();//获取状态值 } //当状态值为0才返回1,否则返回-1,也用来判断线程是否拥有该锁,值大于0,不拥有,小于0,则拥有 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //对状态值进行操作,每一次成功,则状态值-1,
//也知道只有状态值为1,然后再执行该方法,才会返回true,否则其它情况全是返回false protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) {//这个无限循环是为了保证在进行有其他线程也在操作状态值,导致失败之后就不操作了 int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//对状态值递减,若有其他线程也在操作,则可通过for的无限循环来保证一定能递减成功 return nextc == 0; } } }
该类重写了AQS的tryAcquireShared(int)和tryReleaseShared(int)两个方法,
下面来看看这个CountDownLatch类常用的方法
2、await()方法
1 public void await() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 } 4 5 6 public final void acquireSharedInterruptibly(int arg) 7 throws InterruptedException { 8 if (Thread.interrupted()) 9 throw new InterruptedException(); 10 if (tryAcquireShared(arg) < 0) 11 doAcquireSharedInterruptibly(arg); 12 }
在调用await方法时,再用sync去调用AQS的内部方法acquireSharedInterruptibly(因为sync类没重写该方法),会先判断当前线程是否被中断(中断一般是由外部条件引起的),若中断直接抛出异常,否则,获取通过tryAcquireShared方法来判断当前线程是否拥有该共享锁,当值小于0,则拥有,大于0,则不拥有,继续下一步,若有锁,则再执行doAcquireSharedInterruptibly方法,
1 private void doAcquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 final Node node = addWaiter(Node.SHARED);//对当前线程进行一个包装,同时也初始化了等待队列,即head->node->...->tail 4 boolean failed = true; 5 try { 6 for (;;) { 7 final Node p = node.predecessor();//获取该node节点的前一个节点,一般首次调用时,该前一个节点就是head节点。 8 if (p == head) { 9 int r = tryAcquireShared(arg);//再次获取锁的状态, 10 if (r >= 0) {//若状态值为0,则进入 11 setHeadAndPropagate(node, r); 12 p.next = null; // help GC 13 failed = false; 14 return; 15 } 16 }
//shouldParakAfterFailedAcquire方法主要是针对node节点的状态进行操作,若为signal,则挂起,若为0或PROPAGATE,则转换成signal,为cancelled,则放弃,寻找前一个不是该状态值的节点 17 if (shouldParkAfterFailedAcquire(p, node) && 18 parkAndCheckInterrupt())//挂起线程 19 throw new InterruptedException(); 20 } 21 } finally { 22 if (failed)//若failed为true,一般是出现了异常,或者线程被中断 23 cancelAcquire(node); 24 } 25 }
从上述分析来看,只有当状态值为0的时候,才会调用setHeadAndPropagate(node,int)方法,否则会无限等待,当前线程也会被挂起,该方法源码如下
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);//将node节点设置为头结点,对比前面的doAcquireSharedInterruptibly方法,也就是头结点的下一个节点,且该节点的状态为shared //对propagate值,头结点和状态,进行判断 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next;//获取node节点的下一个节点 //对node节点的下一个节点进行判断,是否为null,和状态值是否为shared if (s == null || s.isShared()) //该方法作用为了释放当前锁,即线程阻塞 doReleaseShared(); } }
上面说的是执行await方法后,发生的一系列操作,也知道了只有当状态值为0,才会使线程通行,下面来看一看怎么使状态值为0的。
3、countDown方法
在调用tryReleaseShared方法,每调用一次,state值就会减一,但除了某个时刻当state值减一后恰好为0,才会返回true,否则返回false,为0时刻,也表明锁被其它线程给释放了。
1 public void countDown() { 2 sync.releaseShared(1); 3 } 4 5 public final boolean releaseShared(int arg) { 6 //尝试获取锁的状态 7 if (tryReleaseShared(arg)) { 8 doReleaseShared();//此时,状态值已经为0,执行doReleasseShared方法, 9 return true; 10 } 11 return false; 12 }
也许会有人有疑问,说,为什么在执行await方法后的一些类操作中,也执行了doReleaseShared方法,这岂不是要释放两次?
其实不然,主要是怕doAcquireSharedInterruptibly方法执行后,由于某种原因,当前线程为挂起(即阻塞了),不再执行了,这时只有通过releaseShared方法来唤醒线程,下面看看doReleaseShared方法的实现
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//若头结点状态为signal,则进入,头结点初始化时的状态值为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//若交换失败,则终止此步操作,继续下一轮循环 continue; // loop to recheck cases unparkSuccessor(h);//这是释放锁的关键 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // 若head节点被改变了,则继续循环,否则,跳出循环 break; } }
unparkSuccessor(node)分析如下,该方法作用是为了释放node节点的后一个节点中的线程,在这里,node节点就是head节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next;
//下一个节点为null或状态值为cancelled, if (s == null || s.waitStatus > 0) { s = null;
//由后往前搜索,节点状态值小于或等于0的节点(即状态值不是cancelled值),搜索到的结果一定是最靠近node节点的,且状态值<=0.
//至于为什么不从前往后搜索,原因不太清楚!!!
for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//释放 }
三、总结
countdownLatch在初始化构造函数时,会先将参数设置为状态state值,之后执行await方法后,会进行这一系列的步骤
1、将shared和当前线程包装成一个node节点,(在第一次调用还会初始化等待队列)在队列中,有这样的队列 head->node,其中node就是被包装成share的节点
2、之后在doAcquireSharedInterruptibly方法中,执行了shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法, 若顺利,则head节点的状态值会变为signal,并且当前线程会通过执行park方法进行挂起。
3、在方法tryReleaseShared中,会一直操作state值,使之减1,一直到state的值,减为0时,在这之前,当前线程一直会被阻塞。当为0时,会执行doReleaseShared方法 对当前线程执行unparkSuccessor方法,进行放行。
以上就是我对countdownLatch类的理解,若有不足之处,还望指正!
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/12344.html