CountDownLatch
相比ReentranceLock,CountDownLatch的流程还是相对比较简单的,CountDownLatch也是基于AQS,它是AQS的共享功能的一个实现。
下面从源代码的实现上详解CountDownLatch。
1、CountDownLatch 构造
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// Sync 就是继承了一个AQS
this.sync = new Sync(count);
}
下面为CountDownLatch中Sync的部分代码片段,CountDownLatch构造器中的count最终还是传递了ASQ中的state,
所以CountDownLatch中的countDown也是对于state状态的改变。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
}
2、CountDownLatch.countDown() 实现
先看countDown所涉及的代码
1、
public void countDown() {
sync.releaseShared(1);
}
2、
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //如果计数器的值为0,那么执行下面操作
doReleaseShared(); //这一步的操作主要是唤醒主线程,因为如果state不等于0的话,主线程一直是阻塞的
return true;
}
return false;
}
3、 CountDownLatch重写的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) //countDown的计数器以及到0,体现在我们程序中的,就是并行的几个任务已经执行完
return false;
int nextc = c-1; //没执行一次countDown,计数器减1
if (compareAndSetState(c, nextc)) // 利用cas来更新state的状态,这里可能有并发,所以这也是用死循环更新的原因
return nextc == 0; //更新成功就返回
}
}
4、
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { /至少有两个节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 说明后继节点需要唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //唤醒后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
5、
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒主线程
LockSupport.unpark(s.thread);
}
3、CountDownLatch.await() 实现
先看代码
1、
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
2、
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) = 0) { // state的状态是0,说明,countDown的所有任务已经完成
setHeadAndPropagate(node, r); //主线程所在的节点设置为头节点
p.next = null; // help GC
failed = false;
return; //主线程结束等待
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed) //如果是非正常退出的话,取消
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//讲等待状态设置为后继唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 当前线程阻塞,判断线程是否中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//取消当前节点获取锁
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next; //如果节点都没有被取消的话,那么这个节点和node是同一个节点
//node的后继节点取消
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// CountDownLatch 逻辑就到这里
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
总结
从实现上看CountDownLatch,它也是基于AQS的,await是通过轮询state的状态来判断所有的任务是否都完成。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/60599.html