AQS是什么
抽象队列同步器, 是JDK juc包下 AbstractQueuedSynchronizer 类的简写,实现了FIFO(First Input First Output)先进先出队列模型用以将获取锁资源的线程进行排队处理,并且提供锁排队线程的唤醒+锁分配机制。
类结构图
非公平锁(ReentrantLock分析)
获取锁实现流程图
释放锁的流程图
加锁代码
lock方法
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
final void lock() {
//cas方式尝试set state字段,state=0代表锁是空闲
if (compareAndSetState(0, 1))
//cas设置成功,将当前线程放到exclusiveOwnerThread字段中
setExclusiveOwnerThread(Thread.currentThread());
else
//处理重入锁和入队列操作
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
acquire方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
public final void acquire(int arg) {
// 如果是当前线程可重入,则完成获取锁操作,返回
if (!tryAcquire(arg) &&
//将当前线程封装成Node节点与CLH队列关联,内部还会调用LockSupport阻塞当前线程,直到unlock
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
tryAcquire 方法
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
nonfairTryAcquire方法
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 尝试获取非公平锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//锁资源空闲 则尝试原子设置state字段
if (compareAndSetState(0, acquires)) {
//设置锁拥有者线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果拥有锁的线程==当前线程
else if (current == getExclusiveOwnerThread()) {
//计数器+1 可重入锁核心
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置计数器, state=lock次数
setState(nextc);
return true;
}
return false;
}
addWaiter方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
//将当前线程封装成Node,加入CLH关联
private Node addWaiter(Node mode) {
// 将当前线程封装成Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果尾节点不等于空
if (pred != null) {
//将当前节点的pre设置为尾节点
node.prev = pred;
//CAS方式将当前节点设置为尾节点
if (compareAndSetTail(pred, node)) {
//old尾节点.next = 当前节点, 完成CLH入队操作
pred.next = node;
return node;
}
}
//自旋将当前节点加入到CLH队列
enq(node);
return node;
}
}
enq方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
//将传入的node加入到CLH队列中
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
if (t == null) {
//如果尾节点为空,新建一个空线程属性的Node实例并且将此实例设置为头结点
//为什么会需要一个空线程属性的Node? 因为释放锁的时候会认为head节点就是持有锁资源的线程,到时候直接唤醒head节点的next 节点线程
if (compareAndSetHead(new Node()))
//将头结点赋值给尾节点
tail = head;
} else {
node.prev = t;
//CAS方式把node加入到对尾
if (compareAndSetTail(t, node)) {
//将之前的尾节点的next属性关联到当前node上,完成入队操作
//|A|->>|B| 最终形成的队列示例模型 |A|->>|B|->>|C|
t.next = node;
return t;
}
}
}
}
}
acquireQueued 方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
// 自旋提供获取锁
// 获取锁失败lock,当unlock线程后,会再次自旋获取锁,如果成功则返回,否则再次lock,循环往复
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的上一个节点
final Node p = node.predecessor();
//如果上一个节点就是头节点那么可以尝试直接获取锁,可能会失败,因为中间可能有其他线程加入;当持有锁资源的线程就是当前线程则直接能获取到锁
if (p == head && tryAcquire(arg)) {
//将node设置为头节点
setHead(node);
p.next = null; // help GC
failed = false;
//获取锁资源成功,方法结束
return interrupted;
}
//是否需要阻塞线程并且阻塞线程成功, 当unlock的时候唤醒线程后会再次自旋获取锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
setHead方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private void setHead(Node node) {
head = node;
//本身是已经获取到锁资源的线程,不需要在队列中等候唤醒,所以将其置为null
node.thread = null;
//因为本身是head节点因此没有prev节点,将prev节点置为null
node.prev = null;
}
}
shouldParkAfterFailedAcquire方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
//是否符合在获取锁失败的情况下阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取pre.pre节点的状态
int ws = pred.waitStatus;
//pre.pre已经处于需要unpark状态,说明还在排队等待获取锁,直接返回true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// waitStatus > 1表示该node获取锁的任务已经被取消了
do {
//一直找到waitStatus < 1的pre节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//将node与找到的pre节点做关联
pred.next = node;
} else {
//如果pre.pre.waitStatus=0, 则说明锁还被占用,将pre的状态设置为需要unpark的状态;
//之所以在这里设置pre的状态,是为了unlock的时候,会认为waitStatus==0的节点就是尾节点了,没有需要唤醒的节点线程了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
parkAndCheckInterrupt方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//前面的线程unlock后清除中断标志
return Thread.interrupted();
}
}
释放锁的代码
release方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
//释放锁 arg可以理解为释放几次,因为可重入是依靠计数器state字段实现,同一个线程每次lock成功+1
public final boolean release(int arg) {
//更新计数器,如果计数器==0说明当前线程不再持有锁,那么就可以唤醒下一个节点线程了
if (tryRelease(arg)) {
Node h = head;
//如果头结点不等于空并且头结点的waitStatus不等于0,说明需要队列中还有等待唤醒的线程节点
if (h != null && h.waitStatus != 0) {
//唤醒next线程
unparkSuccessor(h);
}
//否则认为队列中没有线程在等待,这就是加锁的时候阻塞线程时才设置pre.pre.waitStatus的值,因为这里waitSatus==0就代表是最后一个节点了
return true;
}
return false;
}
}
tryRelease方法
public class ReentrantLock implements Lock, java.io.Serializable {
//操作计数器
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//计数器==0,说明当前线程不再持有锁资源
free = true;
//将线程持有者引用置空
setExclusiveOwnerThread(null);
}
//更新计数器
setState(c);
//返回锁是否空闲
return free;
}
}
unparkSuccessor方法
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
private void unparkSuccessor(Node node) {
....
//获取当前node的下一个节点进行唤醒;这就是为什么当的CLH队列是空的时候,要添加一个空thread属性的Node作为头节点
Node s = node.next;
//如果下一个节点等于空或者下一个节点已取消
if (s == null || s.waitStatus > 0) {
s = null;
//遍历整个CLH队列,找出离当前节点最近的下一个处于需要unpark状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) {
s = t;
}
}
if (s != null) {
//唤醒下一个节点,唤醒后又会进入自旋获取锁,见 #acquireQueued 方法
LockSupport.unpark(s.thread);
}
}
}
公平锁
与非公平锁的差异就是,入队的时候在不是可重入的情况下不会去尝试获取锁
总结
将线程封装成Node实例入CLH队列进行排队,结合LockSupport工具类进行线程阻塞与唤醒,结合Unsafe类的原子操作内存地址来进行state计数器标记 实现AQS加锁与解锁
原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/275832.html