CountDownLatch源码解析详解编程语言

  一、CountDownLatch介绍

      CountDownLatch是在jdk1.5被引入的,它主要是通过一个计数器来实现的,当在初始化该类的构造函数时,会事先传入一个状态值,之后在执行await方法后,

    在这个状态值为0之前,当前线程(指的是调用await的线程)会一直等待。它内部使用了AQS来实现的,且是共享锁,具体怎么实现,待会看看它的实现原理。

      它的应用场景:

      一般在于在执行当前线程之前,要完成n个线程的任务,才能执行当前线程。这种场景适合用countdownLatch。

   二、源码解析

      先来看看该类的构造,如下图

      CountDownLatch源码解析详解编程语言

      如图,红色框选中的是该类的一个内部类,该内部类实现了抽象类AQS,具体锁的获取和释放是由该内部类实现的。

     由上图知countdownLatch只有一个构造函数,    

1    public CountDownLatch(int count) { 
2         if (count < 0) throw new IllegalArgumentException("count < 0"); 
3         this.sync = new Sync(count); 
4     }

 

    很明显,它有一个参数,这个参数,被用在哪里呢,请看下面

1      Sync(int count) { 
2             setState(count); 
3         }

 

     这个参数最终用在了状态值上,由此可猜测,这个状态值决定这锁什么时候释放。

      1、内部类Sync

        

   private static final class Sync extends AbstractQueuedSynchronizer { 
        private static final long serialVersionUID = 4982264981922014374L; 
 
        Sync(int count) { 
            setState(count);//设置状态值的大小 
        } 
 
        int getCount() { 
            return getState();//获取状态值 
        } 
    //当状态值为0才返回1,否则返回-1,也用来判断线程是否拥有该锁,值大于0,不拥有,小于0,则拥有 
        protected int tryAcquireShared(int acquires) { 
            return (getState() == 0) ? 1 : -1; 
        } 
     //对状态值进行操作,每一次成功,则状态值-1,
     //也知道只有状态值为1,然后再执行该方法,才会返回true,否则其它情况全是返回false
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) {//这个无限循环是为了保证在进行有其他线程也在操作状态值,导致失败之后就不操作了 int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//对状态值递减,若有其他线程也在操作,则可通过for的无限循环来保证一定能递减成功 return nextc == 0; } } }

 

  该类重写了AQS的tryAcquireShared(int)和tryReleaseShared(int)两个方法, 

  下面来看看这个CountDownLatch类常用的方法

     2、await()方法

 1   public void await() throws InterruptedException { 
 2         sync.acquireSharedInterruptibly(1); 
 3     } 
 4  
 5  
 6     public final void acquireSharedInterruptibly(int arg) 
 7             throws InterruptedException { 
 8         if (Thread.interrupted()) 
 9             throw new InterruptedException(); 
10         if (tryAcquireShared(arg) < 0) 
11             doAcquireSharedInterruptibly(arg); 
12     }

 

  在调用await方法时,再用sync去调用AQS的内部方法acquireSharedInterruptibly(因为sync类没重写该方法),会先判断当前线程是否被中断(中断一般是由外部条件引起的),若中断直接抛出异常,否则,获取通过tryAcquireShared方法来判断当前线程是否拥有该共享锁,当值小于0,则拥有,大于0,则不拥有,继续下一步,若有锁,则再执行doAcquireSharedInterruptibly方法,

  

 1   private void doAcquireSharedInterruptibly(int arg) 
 2         throws InterruptedException { 
 3         final Node node = addWaiter(Node.SHARED);//对当前线程进行一个包装,同时也初始化了等待队列,即head->node->...->tail 
 4         boolean failed = true; 
 5         try { 
 6             for (;;) { 
 7                 final Node p = node.predecessor();//获取该node节点的前一个节点,一般首次调用时,该前一个节点就是head节点。 
 8                 if (p == head) { 
 9                     int r = tryAcquireShared(arg);//再次获取锁的状态, 
10                     if (r >= 0) {//若状态值为0,则进入 
11                         setHeadAndPropagate(node, r); 
12                         p.next = null; // help GC 
13                         failed = false; 
14                         return; 
15                     } 
16                 }
 //shouldParakAfterFailedAcquire方法主要是针对node节点的状态进行操作,若为signal,则挂起,若为0或PROPAGATE,则转换成signal,为cancelled,则放弃,寻找前一个不是该状态值的节点
17 if (shouldParkAfterFailedAcquire(p, node) && 18 parkAndCheckInterrupt())//挂起线程 19 throw new InterruptedException(); 20 } 21 } finally { 22 if (failed)//若failed为true,一般是出现了异常,或者线程被中断 23 cancelAcquire(node); 24 } 25 }

 

    从上述分析来看,只有当状态值为0的时候,才会调用setHeadAndPropagate(node,int)方法,否则会无限等待,当前线程也会被挂起,该方法源码如下

 private void setHeadAndPropagate(Node node, int propagate) { 
        Node h = head;  
        setHead(node);//将node节点设置为头结点,对比前面的doAcquireSharedInterruptibly方法,也就是头结点的下一个节点,且该节点的状态为shared 
        //对propagate值,头结点和状态,进行判断 
        if (propagate > 0 || h == null || h.waitStatus < 0 || 
            (h = head) == null || h.waitStatus < 0) { 
            Node s = node.next;//获取node节点的下一个节点 
         //对node节点的下一个节点进行判断,是否为null,和状态值是否为shared 
            if (s == null || s.isShared()) 
          //该方法作用为了释放当前锁,即线程阻塞 
                doReleaseShared(); 
        } 
    }

 

    上面说的是执行await方法后,发生的一系列操作,也知道了只有当状态值为0,才会使线程通行,下面来看一看怎么使状态值为0的。

  3、countDown方法

    在调用tryReleaseShared方法,每调用一次,state值就会减一,但除了某个时刻当state值减一后恰好为0,才会返回true,否则返回false,为0时刻,也表明锁被其它线程给释放了。

 1 public void countDown() { 
 2         sync.releaseShared(1); 
 3     } 
 4  
 5 public final boolean releaseShared(int arg) { 
 6      //尝试获取锁的状态 
 7         if (tryReleaseShared(arg)) { 
 8             doReleaseShared();//此时,状态值已经为0,执行doReleasseShared方法, 
 9             return true; 
10         } 
11         return false; 
12     }

 

    也许会有人有疑问,说,为什么在执行await方法后的一些类操作中,也执行了doReleaseShared方法,这岂不是要释放两次?

   其实不然,主要是怕doAcquireSharedInterruptibly方法执行后,由于某种原因,当前线程为挂起(即阻塞了),不再执行了,这时只有通过releaseShared方法来唤醒线程,下面看看doReleaseShared方法的实现

  

private void doReleaseShared() { 
        /* 
         * Ensure that a release propagates, even if there are other 
         * in-progress acquires/releases.  This proceeds in the usual 
         * way of trying to unparkSuccessor of head if it needs 
         * signal. But if it does not, status is set to PROPAGATE to 
         * ensure that upon release, propagation continues. 
         * Additionally, we must loop in case a new node is added 
         * while we are doing this. Also, unlike other uses of 
         * unparkSuccessor, we need to know if CAS to reset status 
         * fails, if so rechecking. 
         */ 
        for (;;) { 
            Node h = head; 
            if (h != null && h != tail) { 
                int ws = h.waitStatus; 
                if (ws == Node.SIGNAL) {//若头结点状态为signal,则进入,头结点初始化时的状态值为0 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//若交换失败,则终止此步操作,继续下一轮循环 
                        continue;            // loop to recheck cases 
                    unparkSuccessor(h);//这是释放锁的关键 
                } 
                else if (ws == 0 && 
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
                    continue;                // loop on failed CAS 
            } 
            if (h == head)                   // 若head节点被改变了,则继续循环,否则,跳出循环 
                break; 
        } 
    }

 

  unparkSuccessor(node)分析如下,该方法作用是为了释放node节点的后一个节点中的线程,在这里,node节点就是head节点

private void unparkSuccessor(Node node) { 
       
        int ws = node.waitStatus; 
        if (ws < 0) 
            compareAndSetWaitStatus(node, ws, 0); 
 
        
        Node s = node.next;
      //下一个节点为null或状态值为cancelled,
if (s == null || s.waitStatus > 0) { s = null;
        //由后往前搜索,节点状态值小于或等于0的节点(即状态值不是cancelled值),搜索到的结果一定是最靠近node节点的,且状态值<=0.
        //至于为什么不从前往后搜索,原因不太清楚!!!
for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//释放 }

 

 

 

  三、总结 

    countdownLatch在初始化构造函数时,会先将参数设置为状态state值,之后执行await方法后,会进行这一系列的步骤
    1、将shared和当前线程包装成一个node节点,(在第一次调用还会初始化等待队列)在队列中,有这样的队列 head->node,其中node就是被包装成share的节点

    2、之后在doAcquireSharedInterruptibly方法中,执行了shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法, 若顺利,则head节点的状态值会变为signal,并且当前线程会通过执行park方法进行挂起。

      
    3、在方法tryReleaseShared中,会一直操作state值,使之减1,一直到state的值,减为0时,在这之前,当前线程一直会被阻塞。当为0时,会执行doReleaseShared方法 对当前线程执行unparkSuccessor方法,进行放行。

 


 

  以上就是我对countdownLatch类的理解,若有不足之处,还望指正!

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/12344.html

(0)
上一篇 2021年7月19日 14:34
下一篇 2021年7月19日 14:34

相关推荐

发表回复

登录后才能评论