JVM中有这样一段注释:
// The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provides park(), unpark(), etc., and // is abstract -- that is, a PlatformEvent should never be instantiated except // as part of a ParkEvent. // Equivalently we could have defined a platform-independent base-class that // exported Allocate(), Release(), etc. The platform-specific class would extend // that base-class, adding park(), unpark(), etc. // // A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. // // We'll want to eventually merge these redundant facilities and use ParkEvent.
其中是说ParkEvent用于Java语言级别的关键字synchronized。 Parkers用于Java类库中的并发数据集合,该集合是由JSR166发展来的。 这里说这两个东西功能类似,将来会统一使用ParkEvent。 那么它们究竟有什么区别呢? 我们先看看这两个类的大概接口样子: (ParkEvent)
class ParkEvent : public os::PlatformEvent { private: ParkEvent * FreeNext ; // Current association Thread * AssociatedWith ; intptr_t RawThreadIdentity ; // LWPID etc volatile int Incarnation ; class PlatformEvent : public CHeapObj<mtInternal> { // Use caution with reset() and fired() -- they may require MEMBARs void reset() { _Event = 0 ; } int fired() { return _Event; } void park () ; void unpark () ; int TryPark () ; int park (jlong millis) ; // relative timed-wait only
class Parker : public os::PlatformParker { public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ;
可以看到它们提供一致的相同接口,park和unpark。从而支撑Java中并发控制的功能。 它们究竟有什么不同呢?我们首先来执行2段类似的代码。 阻塞线程获取锁的顺序完全相反 首先是使用synchronized提供的锁机制,我们随便用一个Object lock = new Object()作为锁关联的对象,代码如下,它的功能是让10个线程进入阻塞状态,然后释放锁,观察随后线程获取锁的顺序: (可执行代码)
package com.psly.testLocks; public class TestLockSynchronized { private static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub int N = 10; Thread[] threads = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ public void run() { synchronized(lock){ System.out.println(Thread.currentThread().getName() + " get synch lock!"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } } for(int i = 0; i < N; ++i) threads[i].join(); } }
我们用一个0.2seconds的时间,从而让先创建的线程能够先进入阻塞状态,输出为:
Thread-9 get synch lock! Thread-8 get synch lock! Thread-7 get synch lock! Thread-6 get synch lock! Thread-5 get synch lock! Thread-4 get synch lock! Thread-3 get synch lock! Thread-2 get synch lock! Thread-1 get synch lock! Thread-0 get synch lock!
这有点奇怪,先尝试获取锁的线程竟然后获得锁! 先不管这个, 我们把这个例子改为JSR166的Lock重做一遍: (可执行代码)
package com.psly.testLocks; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLockSynchronized { private static Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub int N = 10; Thread[] threads = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ public void run() { lock.lock(); System.out.println(Thread.currentThread().getName() + " get JSR166 lock!"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } lock.unlock(); } }); } lock.lock(); for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } lock.unlock(); for(int i = 0; i < N; ++i) threads[i].join(); } }
输出为:
Thread-0 get JSR166 lock! Thread-1 get JSR166 lock! Thread-2 get JSR166 lock! Thread-3 get JSR166 lock! Thread-4 get JSR166 lock! Thread-5 get JSR166 lock! Thread-6 get JSR166 lock! Thread-7 get JSR166 lock! Thread-8 get JSR166 lock! Thread-9 get JSR166 lock!
这个输出比较符合了我们的预期,毕竟先尝试获取锁的的确先获取了锁。 为什么这两种实现有这样的差异呢,我们来看下他们分别的阻塞队列实现,首先是JAVA的:
public void lock() { sync.lock(); } final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
我们这里重点查看addWaiter(Node node);可以看出来,线程构造的阻塞节点是通过tail字段加入进队列的,并且作为next节点。这是个先进先出双向队列。 所以当锁被释放时,阻塞线程获取锁的顺序与进阻塞队列是一致的。 我们接着看下synchronized的实现,这里涉及到JVM中系统编程的源码,这里也只贴出跟进入阻塞队列相关的代码:
class ObjectMonitor; class ObjectSynchronizer : AllStatic { static void fast_enter (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS); static void slow_enter (Handle obj, BasicLock* lock, TRAPS); void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { if (UseBiasedLocking) { if (!SafepointSynchronize::is_at_safepoint()) { BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return; } } else { assert(!attempt_rebias, "can not rebias toward VM thread"); BiasedLocking::revoke_at_safepoint(obj); } assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } slow_enter (obj, lock, THREAD) ; } void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { markOop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); ................ ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); } void ATTR ObjectMonitor::enter(TRAPS) { // The following code is ordered to check the most common cases first // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors. Thread * const Self = THREAD ; void * cur ; ········· for (;;) { jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() // or java_suspend_self() EnterI (THREAD) ; if (!ExitSuspendEquivalent(jt)) break ; // // We have acquired the contended monitor, but while we were // waiting another thread suspended us. We don't want to enter // the monitor while suspended because that would surprise the // thread that suspended us. // _recursions = 0 ; ······ } void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; // Try the lock - TATAS if (TryLock (Self) > 0) { ...... } if (TrySpin (Self) > 0) { ...... } ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; // Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } }
这里的重点是,ObjectWaiter node(Self)
ObjectWaiter node(Self) ; ........ for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } }
_cxq,我们采用比较并交换的原子指令,修改了_cxq,修改之前将_cxq的旧值填入node的next字段,这样一来我们就在_cxq上构造了个stack,也就是先进后出的队列。于是下次当我们索取_cxq时候自然就取得了最后填入的值。这解释了我们上面的执行示例,阻塞线程获取锁的顺序与进队列完全相反。 我们接着看下再复杂点的例子,依次启动10个线程,依次获取锁,获得锁的同时打印自身信息,然后主动调用wait语义的方法陷入阻塞状态。等到这10个线程都阻塞之后主线程获取锁,接着再启动10个无等待线程,这是个线程唯一做的事情就是依次获取锁,他们会按照我们上面所说的方式进入阻塞队列。接着主线程依次发送4次notify语义的信号(注意时间间隔),然后释放锁。我们感兴趣的是这几个收到通知的线程,他们相对已经在阻塞队列中的线程,谁会先获取锁?他们的排列又是怎么样的呢? 我们先执行JSR166的版本,代码如下: (可执行代码)
package com.psly.testLocks; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLockSynchronized { private static Lock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { int N = 10; Thread[] threads = new Thread[N]; Thread[] threadsForWaits = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getName() + " nowait get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } lock.unlock(); } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i] = new Thread(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub lock.lock(); //synchronized(lock){ System.out.println(Thread.currentThread().getName() + " wait first get lock"); try { condition.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " wait second get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } lock.unlock(); } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i].start(); Thread.sleep(200); } lock.lock(); //synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } for(int i = 0; i < 4 ; ++i){ condition.signal(); } Thread.sleep(200); lock.unlock(); for(int i = 0; i < N; ++i) threads[i].join(); for(int i = 0; i < N; ++i) threadsForWaits[i].join(); } }
Thread-10到Thread-19为主动调用wait阻塞的线程,Thread-0到Thread-9为只获取锁的线程。 输出为:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-0 nowait get lock Thread-1 nowait get lock Thread-2 nowait get lock Thread-3 nowait get lock Thread-4 nowait get lock Thread-5 nowait get lock Thread-6 nowait get lock Thread-7 nowait get lock Thread-8 nowait get lock Thread-9 nowait get lock Thread-10 wait second get lock Thread-11 wait second get lock Thread-12 wait second get lock Thread-13 wait second get
可以看到JSR166的实现依然满足先进先出,即使Thread-10到Thread-13是先获取锁之后陷入wait的。我们接着看下这是如何做到的, 注意JSR166的实现是在JAVA层面完成的。 主要是三个调用:wait,notify,unlock。 await:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
这里的addConditionWaiter尝试添加等待队列的节点。acquireQueued用于将来被唤醒之后的再次尝试获取锁。 我们来看addConditionWaiter,
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
是将新节点作为lastWaiter的next节点,并且本身成为lastWaiter节点。那么这里说明这构造的是一个先进先出的队列。(这里是在已经获取锁的情况下,所以不需同步) 我们接着看 signal
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
以上已经拿到了等待队列第一个节点,接着enq让他转移(transfer)到锁的阻塞队列
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这样一来我们就完成了将等待线程从处于wait的状态,转移到了未获得锁处于阻塞的状态。 最后看下当主线程释放锁时的操作: unlock
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
可以看到,调用unlock的线程唤醒了阻塞队列head中的第一个线程。 (OVER) 因为阻塞队列跟等待队列都是先进先出,这样子能够得到一个比较好的行为。 从而导致了我们之前的输出,看上去比较符合预期。 最后我们来看看采用synchronized,这个示例下的输出是什么,代码如下: (可执行代码)
package com.psly.testLocks; public class TestLockSynchronized { private static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { int N = 10; Thread[] threads = new Thread[N]; Thread[] threadsForWaits = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ @Override public void run() { synchronized(lock){ System.out.println(Thread.currentThread().getName() + " nowait get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i] = new Thread(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub synchronized(lock){ System.out.println(Thread.currentThread().getName() + " wait first get lock"); try { lock.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " wait second get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i].start(); Thread.sleep(200); } synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } for(int i = 0; i < 4 ; ++i){ lock.notify(); } Thread.sleep(200); } for(int i = 0; i < N; ++i) threads[i].join(); for(int i = 0; i < N; ++i) threadsForWaits[i].join(); } }
输出入下:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-10 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock Thread-9 nowait get lock Thread-8 nowait get lock Thread-7 nowait get lock Thread-6 nowait get lock Thread-5 nowait get lock Thread-4 nowait get lock Thread-3 nowait get lock Thread-2 nowait get lock Thread-1 nowait get lock Thread-0 nowait get lock 这个结果很奇怪,最奇怪在于居然是连续输出Thread-10,Thread-13,Thread-12,Thread-11: Thread-10 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock
我们调整一下发送notify的数量,给出所有等待线程数量的调用N。
for(int i = 0; i < N ; ++i){ lock.notify(); }
输出为:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-10 wait second get lock Thread-19 wait second get lock Thread-18 wait second get lock Thread-17 wait second get lock Thread-16 wait second get lock Thread-15 wait second get lock Thread-14 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock Thread-9 nowait get lock Thread-8 nowait get lock Thread-7 nowait get lock Thread-6 nowait get lock Thread-5 nowait get lock Thread-4 nowait get lock Thread-3 nowait get lock Thread-2 nowait get lock Thread-1 nowait get lock Thread-0 nowait get lock
依然是Thread-10莫名其妙出现在最前,后面紧接着Thread-19到Thread-11倒序。 我们再尝试换下调用方式,采用notifyAll();
synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } lock.notifyAll(); Thread.sleep(200); }
输出为:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-19 wait second get lock Thread-18 wait second get lock Thread-17 wait second get lock Thread-16 wait second get lock Thread-15 wait second get lock Thread-14 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock Thread-10 wait second get lock Thread-9 nowait get lock Thread-8 nowait get lock Thread-7 nowait get lock Thread-6 nowait get lock Thread-5 nowait get lock Thread-4 nowait get lock Thread-3 nowait get lock Thread-2 nowait get lock Thread-1 nowait get lock Thread-0 nowait get lock
这下子又变了,Thread-10变为最后,完全逆序来获取锁了。 我们尝试进入JVM去看下这一切是怎么回事。与之前的过程类似,首先我们来看看等待之后线程节点如何组织的: 经研究,应该是下面这片代码: WAIT:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { Thread * const Self = THREAD ; TEVENT (Wait) ; assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; jt->set_current_waiting_monitor(this); ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag // Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor *except* in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet ; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }
如果_WaitSet为空,则设置它,并且前驱和后继都是它。 如果只有_WaitSet一个,则将节点增加到它的后面。 不为空的情况下统一添加到next节点。 所以这里是个先进先出的队列。 notify
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; } inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; } inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev != NULL, "node already removed from list"); assert(node->_next != NULL, "node already removed from list"); // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; if (next == node) { assert(node->_prev == node, "invariant check"); _WaitSet = NULL; } else { ObjectWaiter* prev = node->_prev; assert(prev->_next == node, "invariant check"); assert(next->_prev == node, "invariant check"); next->_prev = prev; prev->_next = next; if (_WaitSet == node) { _WaitSet = next; } } node->_next = NULL; node->_prev = NULL; } static int Knob_MoveNotifyee = 2 ; // notify() - disposition of notifyee if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) {
这里的DequeueWaiter调用DequeueSpecificWaiter,效果是队列出一个元素,_WaitSet.next成为_WaitSet。这里有_EntryList、_cxq两个数据结构。 接着我们走Policy==2分支,注意这里并不是全部放入cxq(尽管注释如此),判断是_EntryList==NULL的时候,直接将我们的节点放入它。否则,将我们的节点添加到_cxq这个stack前面。想象一个,假如第一个节点进来,发现_EntryList为空,_EntryList设置为它自己。从第二个节点开始,所有节点都是进stack,这样的话是不是取出时,第二个往后的节点都颠倒了呢。假如我们取节点的方式是先驱_EntryList,然后再取stack中的元素。则就会发生示例中Thread-10提前的乱序情况。 但是注意,之前的notifyAll并没有产生这种效果。所以我们来看下notifyAll的代码:
if (Policy == 2) { // prepend to cxq // prepend to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } else
果然如此!notifyAll的逻辑跟notify大部分一样,除了它将所有节点都加入cxq。所以我们才会观察到notifyAll调用之后的节点获取锁顺序是逆序。 unlock 我们接着看看unlock的时候,是不是如我们猜测的那样先取_EntryList的元素,再来看cxq。
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) { fast_exit (object, lock, THREAD) ; } void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) { assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here"); // if displaced header is null, the previous enter is recursive enter, no-op ......... ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ; } void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) { ................... for (;;) { assert (THREAD == _owner, "invariant") ; if (Knob_ExitPolicy == 0) { ......... } else { .......... } else { TEVENT (Inflated exit - complex egress) ; } } guarantee (_owner == THREAD, "invariant") ; ObjectWaiter * w = NULL ; int QMode = Knob_QMode ; w = _EntryList ; if (w != NULL) { assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ExitEpilog (Self, w) ; return ; } w = _cxq ; if (w == NULL) continue ; for (;;) { assert (w != NULL, "Invariant") ; ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } if (QMode == 1) { .............. } else { _EntryList = w ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } } if (_succ != NULL) continue; w = _EntryList ; if (w != NULL) { guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ExitEpilog (Self, w) ; return ; } } static int Knob_QMode = 0 ; // EntryList-cxq policy - queue discipline static int Knob_ExitPolicy = 0 ;
这里是先取_EntryList,假如有就调用ExitEpilog并返回,否则采用原子操作取_cxq,然后将这个值再次给_EntryList,并调用ExitEpilog。 总之这里最终都是将数据给_EntryList,只不过假如_EntryList原本就有值,那么我们会先使用它,之后再使用_cxq。 我们看下ExitEpilog完成了什么事:
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) { assert (_owner == Self, "invariant") ; // Exit protocol: // 1. ST _succ = wakee // 2. membar #loadstore|#storestore; // 2. ST _owner = NULL // 3. unpark(wakee) _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ; ParkEvent * Trigger = Wakee->_event ; Wakee = NULL ; // Drop the lock OrderAccess::release_store_ptr (&_owner, NULL) ; OrderAccess::fence() ; // ST _owner vs LD in unpark() if (SafepointSynchronize::do_call_back()) { TEVENT (unpark before SAFEPOINT) ; } Trigger->unpark() ; // Maintain stats and report events to JVMTI if (ObjectMonitor::_sync_Parks != NULL) { ObjectMonitor::_sync_Parks->inc() ; } }
果然这里最后调用了unpark,从而唤醒了相应的那个线程。这里的_EntryList的值会如何变化?我们最后看下,当等待线程从wait中醒过来会做什么: // Note: a subset of changes to ObjectMonitor::wait() // will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { .......... ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; ............. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; ............ if (node.TState == ObjectWaiter::TS_WAIT) { Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ; if (node.TState == ObjectWaiter::TS_WAIT) { DequeueSpecificWaiter (&node) ; // unlink from WaitSet assert(node._notified == 0, "invariant"); node.TState = ObjectWaiter::TS_RUN ; } .............. assert (_owner != Self, "invariant") ; ObjectWaiter::TStates v = node.TState ; if (v == ObjectWaiter::TS_RUN) { enter (Self) ; } else {
进入了enter(Self),
void ATTR ObjectMonitor::enter(TRAPS) { for (;;) { jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() // or java_suspend_self() EnterI (THREAD) ; 进入EnterI(THREAD), void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } DeferredInitialize () ; if (TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } // The Spin failed -- Enqueue and park the thread ... assert (_succ != Self , "invariant") ; assert (_owner != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } } if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) { // Try to assume the role of responsible thread for the monitor. // CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self } Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } TEVENT (Inflated enter - Contention) ; int nWakeups = 0 ; int RecheckInterval = 1 ; for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; TEVENT (Inflated enter - Futile wakeup) ; if (ObjectMonitor::_sync_FutileWakeups != NULL) { ObjectMonitor::_sync_FutileWakeups->inc() ; } ++ nWakeups ; if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) { Self->_ParkEvent->reset() ; OrderAccess::fence() ; } if (_succ == Self) _succ = NULL ; // Invariant: after clearing _succ a thread *must* retry _owner before parking. OrderAccess::fence() ; } assert (_owner == Self , "invariant") ; assert (object() != NULL , "invariant") ; // I'd like to write: // guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; // but as we're at a safepoint that's not safe. UnlinkAfterAcquire (Self, &node) ; if (_succ == Self) _succ = NULL ; assert (_succ != Self, "invariant") ; if (_Responsible == Self) { _Responsible = NULL ; OrderAccess::fence(); // Dekker pivot-point .........
这里的重点是UnlinkAfterAcquire,
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode) { assert (_owner == Self, "invariant") ; assert (SelfNode->_thread == Self, "invariant") ; if (SelfNode->TState == ObjectWaiter::TS_ENTER) { // Normal case: remove Self from the DLL EntryList . // This is a constant-time operation. ObjectWaiter * nxt = SelfNode->_next ; ObjectWaiter * prv = SelfNode->_prev ; if (nxt != NULL) nxt->_prev = prv ; if (prv != NULL) prv->_next = nxt ; if (SelfNode == _EntryList ) _EntryList = nxt ; assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ; TEVENT (Unlink from EntryList) ; } else {
它会将_EntryList的值做更新,从而让锁的获取继续下去,保证不会出错。 到这里为止,我们终于大致走完了一遍synchronized锁与lock锁分别在JVM和JUC中的实现。 那么有个问题,linux中pthread锁的实现,行为模式又是怎么样的呢? 我们尝试将使用pthread来执行测试这两个例子: 锁获取代码: 编译生成执行文件testLock:gcc -pthread testLock.c -o testLock 执行:./testLock
#include <stdio.h> #include <pthread.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define N 10 void* runTask(void* pm){ pthread_mutex_lock(&mutex); printf("%d get lock/n", (int)pm); usleep(100000); pthread_mutex_unlock(&mutex); return 0; } int main(){ // int N = 10; pthread_t threads[N]; int i = 0; pthread_mutex_lock(&mutex); for(i = 0; i < N; ++i){ pthread_create(&threads[i], 0, runTask, (void*)i); usleep(100000); } pthread_mutex_unlock(&mutex); for(i = 0; i < N; ++i){ pthread_join(threads[i], NULL); } return 0; }
输出:
0 get lock 1 get lock 2 get lock 3 get lock 4 get lock 5 get lock 6 get lock 7 get lock 8 get lock 9 get lock
锁获取+阻塞等待代码:
#include <stdio.h> #include <pthread.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; #define N 10 void* runTask(void* pm){ pthread_mutex_lock(&mutex); printf("%d get lock/n", (int)pm); usleep(100000); pthread_mutex_unlock(&mutex); return 0; } void* runTaskWithWait(void* pm){ pthread_mutex_lock(&mutex); printf("%d wait first get lock/n", (int)pm); pthread_cond_wait(&cond, &mutex); printf("%d wait second get lock/n", (int)pm); usleep(300000); pthread_mutex_unlock(&mutex); } int main(){ // int N = 10; pthread_t threads[N]; pthread_t threadsForWaits[N]; int i = 0; for(; i < N; ++i){ pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i); usleep(100000); } pthread_mutex_lock(&mutex); for(i = 0; i < N; ++i){ pthread_create(&threads[i], 0, runTask, (void*)i); usleep(100000); } //pthread_cond_broadcast(&cond); for(i = 0; i < N; ++i) pthread_cond_signal(&cond); usleep(100000); pthread_mutex_unlock(&mutex); for(i = 0; i < N; ++i){ pthread_join(threads[i], NULL); } for(i = 0; i < N; ++i){ pthread_join(threadsForWaits[i], NULL); } return 0; }
输出:
0 wait first get lock 1 wait first get lock 2 wait first get lock 3 wait first get lock 4 wait first get lock 5 wait first get lock 6 wait first get lock 7 wait first get lock 8 wait first get lock 9 wait first get lock 0 get lock 1 get lock 2 get lock 3 get lock 4 get lock 5 get lock 6 get lock 7 get lock 8 get lock 9 get lock 1 wait second get lock 2 wait second get lock 3 wait second get lock 0 wait second get lock 4 wait second get lock 5 wait second get lock 6 wait second get lock 7 wait second get lock 8 wait second get lock 9 wait second get lock
只能说等待线程转移到阻塞线程之后的排列,看起来是没啥规律 (=@__@=)
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/99860.html