java AQS分析


AQS是什么

抽象队列同步器, 是JDK juc包下 AbstractQueuedSynchronizer 类的简写,实现了FIFO(First Input First Output)先进先出队列模型用以将获取锁资源的线程进行排队处理,并且提供锁排队线程的唤醒+锁分配机制。

类结构图

image

非公平锁(ReentrantLock分析)

获取锁实现流程图

image

释放锁的流程图

image

加锁代码

lock方法

 /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
      
        final void lock() {
            //cas方式尝试set state字段,state=0代表锁是空闲
            if (compareAndSetState(0, 1))
                //cas设置成功,将当前线程放到exclusiveOwnerThread字段中
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //处理重入锁和入队列操作
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

acquire方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    public final void acquire(int arg) {
        // 如果是当前线程可重入,则完成获取锁操作,返回
        if (!tryAcquire(arg) &&
            //将当前线程封装成Node节点与CLH队列关联,内部还会调用LockSupport阻塞当前线程,直到unlock
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

tryAcquire 方法

static final class NonfairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

nonfairTryAcquire方法

abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
         * 尝试获取非公平锁
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //锁资源空闲 则尝试原子设置state字段
                if (compareAndSetState(0, acquires)) {
                    //设置锁拥有者线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果拥有锁的线程==当前线程
            else if (current == getExclusiveOwnerThread()) {
                //计数器+1 可重入锁核心
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //设置计数器, state=lock次数
                setState(nextc);
                return true;
            }
            return false;
        }

addWaiter方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    //将当前线程封装成Node,加入CLH关联
     private Node addWaiter(Node mode) {
         // 将当前线程封装成Node
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
         //如果尾节点不等于空
            if (pred != null) {
                //将当前节点的pre设置为尾节点
                node.prev = pred;
                //CAS方式将当前节点设置为尾节点
                if (compareAndSetTail(pred, node)) {
                    //old尾节点.next = 当前节点, 完成CLH入队操作
                    pred.next = node;
                    return node;
                }
            }
         //自旋将当前节点加入到CLH队列
            enq(node);
            return node;
        }
}

enq方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    //将传入的node加入到CLH队列中
     private Node enq(final Node node) {
         //自旋
        for (;;) {
            Node t = tail;
            if (t == null) {
                //如果尾节点为空,新建一个空线程属性的Node实例并且将此实例设置为头结点
                //为什么会需要一个空线程属性的Node? 因为释放锁的时候会认为head节点就是持有锁资源的线程,到时候直接唤醒head节点的next 节点线程
                if (compareAndSetHead(new Node()))
                    //将头结点赋值给尾节点
                    tail = head;
            } else {
                node.prev = t;
                //CAS方式把node加入到对尾
                if (compareAndSetTail(t, node)) {
                    //将之前的尾节点的next属性关联到当前node上,完成入队操作
                    //|A|->>|B| 最终形成的队列示例模型  |A|->>|B|->>|C|
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

acquireQueued 方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    // 自旋提供获取锁
    // 获取锁失败lock,当unlock线程后,会再次自旋获取锁,如果成功则返回,否则再次lock,循环往复
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取node的上一个节点
                final Node p = node.predecessor();
                //如果上一个节点就是头节点那么可以尝试直接获取锁,可能会失败,因为中间可能有其他线程加入;当持有锁资源的线程就是当前线程则直接能获取到锁
                if (p == head && tryAcquire(arg)) {
                    //将node设置为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //获取锁资源成功,方法结束
                    return interrupted;
                }
                //是否需要阻塞线程并且阻塞线程成功, 当unlock的时候唤醒线程后会再次自旋获取锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

setHead方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
	private void setHead(Node node) {
        head = node;
       //本身是已经获取到锁资源的线程,不需要在队列中等候唤醒,所以将其置为null
        node.thread = null;
        //因为本身是head节点因此没有prev节点,将prev节点置为null
        node.prev = null;
    }
}

shouldParkAfterFailedAcquire方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    //是否符合在获取锁失败的情况下阻塞当前线程
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取pre.pre节点的状态
        int ws = pred.waitStatus;
        //pre.pre已经处于需要unpark状态,说明还在排队等待获取锁,直接返回true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
        	// waitStatus > 1表示该node获取锁的任务已经被取消了
            do {
                //一直找到waitStatus < 1的pre节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //将node与找到的pre节点做关联
            pred.next = node;
        } else {
           //如果pre.pre.waitStatus=0, 则说明锁还被占用,将pre的状态设置为需要unpark的状态;
            //之所以在这里设置pre的状态,是为了unlock的时候,会认为waitStatus==0的节点就是尾节点了,没有需要唤醒的节点线程了
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

}

parkAndCheckInterrupt方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
      private final boolean parkAndCheckInterrupt() {
          //阻塞当前线程
            LockSupport.park(this);
          //前面的线程unlock后清除中断标志
            return Thread.interrupted();
        }
}

释放锁的代码

release方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    //释放锁 arg可以理解为释放几次,因为可重入是依靠计数器state字段实现,同一个线程每次lock成功+1
     public final boolean release(int arg) {
         //更新计数器,如果计数器==0说明当前线程不再持有锁,那么就可以唤醒下一个节点线程了
        if (tryRelease(arg)) {
            Node h = head;
            //如果头结点不等于空并且头结点的waitStatus不等于0,说明需要队列中还有等待唤醒的线程节点
            if (h != null && h.waitStatus != 0) {
                //唤醒next线程
                unparkSuccessor(h);
            }
            //否则认为队列中没有线程在等待,这就是加锁的时候阻塞线程时才设置pre.pre.waitStatus的值,因为这里waitSatus==0就代表是最后一个节点了
            return true;
        }
        return false;
    }
}

tryRelease方法

public class ReentrantLock implements Lock, java.io.Serializable {
    //操作计数器
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                //计数器==0,说明当前线程不再持有锁资源
                free = true;
                //将线程持有者引用置空
                setExclusiveOwnerThread(null);
            }
        //更新计数器
            setState(c);
        //返回锁是否空闲
            return free;
        }
}

unparkSuccessor方法

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
   private void unparkSuccessor(Node node) {
    	....
            //获取当前node的下一个节点进行唤醒;这就是为什么当的CLH队列是空的时候,要添加一个空thread属性的Node作为头节点
        Node s = node.next;
       //如果下一个节点等于空或者下一个节点已取消
        if (s == null || s.waitStatus > 0) {
            s = null;
            //遍历整个CLH队列,找出离当前节点最近的下一个处于需要unpark状态的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) {
                    s = t;
                }
        }
        if (s != null) {
            //唤醒下一个节点,唤醒后又会进入自旋获取锁,见 #acquireQueued  方法
            LockSupport.unpark(s.thread);
        }
    }
}

公平锁

与非公平锁的差异就是,入队的时候在不是可重入的情况下不会去尝试获取锁

总结

将线程封装成Node实例入CLH队列进行排队,结合LockSupport工具类进行线程阻塞与唤醒,结合Unsafe类的原子操作内存地址来进行state计数器标记 实现AQS加锁与解锁

原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/275832.html

(0)
上一篇 2022年7月21日
下一篇 2022年7月21日

相关推荐

发表回复

登录后才能评论