10年前,cliff click博士就为关联数据结构ConcurrentHashMap给出了一个采用open Address的无阻塞实现(NonBlockingHashMap)。其中为了减少线程之间执行顺序的依赖而采用的算法充满技巧性。这个算法宣称是无锁,几乎可以保证任何时候停止某个特定线程都不会导致整体进程的停止(极端情况下,这一点还是会阻塞整个进程的)。
本文尝试详细地分析其中的完整代码,从而收获其中的无阻塞编程技术:
首先我们来看K/V数据结构:
private transient Object[] _kvs; private static final CHM chm (Object[] kvs) { return (CHM )kvs[0]; } private static final int[] hashes(Object[] kvs) { return (int[])kvs[1]; } private static final boolean CAS_key( Object[] kvs, int idx, Object old, Object key ) { return _unsafe.compareAndSwapObject( kvs, rawIndex(kvs,(idx<<1)+2), old, key ); } private static final boolean CAS_val( Object[] kvs, int idx, Object old, Object val ) { return _unsafe.compareAndSwapObject( kvs, rawIndex(kvs,(idx<<1)+3), old, val ); } private static long rawIndex(final Object[] ary, final int idx) { assert idx >= 0 && idx < ary.length; return _Obase + idx * _Oscale; }
可以看出,NonBlockingHashMap,采用了一个大的Object数组_kvs,将其中的第一、第二个元素默认采用用于CHM和hash码的数值组,从第2开始的元素用于真正的数据存储,也就是说,_kvs中它的数据存储形式为:(2:key1, 3:val1, 4:key2, 5:key2…..)。为了提高并发性,它不再像JUC中的ConcurrentHashMap一样将(key,value),作为一个元素整体来存储了。存储形式如下图所示:
Figure 0:
为了方便显示,我们这个视图的数据结构是只能存储8个元素对(key,value),而默认创建的NonBlockingHashMap能够存储32个元素,初始化代码如下:
/** Create a new NonBlockingHashMap with default minimum size (currently set * to 8 K/V pairs or roughly 84 bytes on a standard 32-bit JVM). */ public NonBlockingHashMap( ) { this(MIN_SIZE); } /** Create a new NonBlockingHashMap with initial room for the given number of * elements, thus avoiding internal resizing operations to reach an * appropriate size. Large numbers here when used with a small count of * elements will sacrifice space for a small amount of time gained. The * initial size will be rounded up internally to the next larger power of 2. */ public NonBlockingHashMap( final int initial_sz ) { initialize(initial_sz); } private final void initialize( int initial_sz ) { if( initial_sz < 0 ) throw new IllegalArgumentException(); int i; // Convert to next largest power-of-2 if( initial_sz > 1024*1024 ) initial_sz = 1024*1024; for( i=MIN_SIZE_LOG; (1<<i) < (initial_sz<<2); i++ ) ; // Double size for K,V pairs, add 1 for CHM and 1 for hashes _kvs = new Object[((1<<i)<<1)+2]; _kvs[0] = new CHM(new Counter()); // CHM in slot 0 _kvs[1] = new int[1<<i]; // Matching hash entries _last_resize_milli = System.currentTimeMillis(); }
好的,了解了初始化之后,我们接着看看调用的主要接口put、putIfAbsent、replace、remove。
public TypeV put ( TypeK key, TypeV val ) { return putIfMatch( key, val, NO_MATCH_OLD); } public TypeV putIfAbsent( TypeK key, TypeV val ) { return putIfMatch( key, val, TOMBSTONE ); } public TypeV replace ( TypeK key, TypeV val ) { return putIfMatch( key, val,MATCH_ANY ); } public TypeV remove ( Object key ) { return putIfMatch( key,TOMBSTONE, NO_MATCH_OLD); }
putIfAbsent:当key对应的val不存在时插入。
replace:当key对应的val存在时更新那个val。
remove:删除存在的(key, val)。
所以我们可以看到第三个数,NO_MATCH_OLD代表直接的put。TOMBSTONE代表对应的(key,val)不存在才插入。MATCH_ANY代表对应的(key,val)存在才插入。
我们接着看它对应的调用方法:
private final TypeV putIfMatch( Object key, Object newVal, Object oldVal ) { if (oldVal == null || newVal == null) throw new NullPointerException(); final Object res = putIfMatch( this, _kvs, key, newVal, oldVal ); assert !(res instanceof Prime); assert res != null; return res == TOMBSTONE ? null : (TypeV)res; }
可以看到返回值res,如果是TOMBSTONE则作为null来返回。我们接着看5个参数的putIfMatch:
// --- putIfMatch --------------------------------------------------------- // Put, Remove, PutIfAbsent, etc. Return the old value. If the returned // value is equal to expVal (or expVal is NO_MATCH_OLD) then the put can be // assumed to work (although might have been immediately overwritten). Only // the path through copy_slot passes in an expected value of null, and // putIfMatch only returns a null if passed in an expected null. private static final Object putIfMatch( final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final Object putval, final Object expVal ) { assert putval != null; assert !(putval instanceof Prime); assert !(expVal instanceof Prime); final int fullhash = hash (key); // throws NullPointerException if key null final int len = len (kvs); // Count of key/value pairs, reads kvs.length final CHM chm = chm (kvs); // Reads kvs[0] final int[] hashes = hashes(kvs); // Reads kvs[1], read before kvs[0] int idx = fullhash & (len-1); // --- // Key-Claim stanza: spin till we can claim a Key (or force a resizing). int reprobe_cnt=0; Object K=null, V=null; Object[] newkvs=null; while( true ) { // Spin till we get a Key slot V = val(kvs,idx); // Get old value (before volatile read below!) K = key(kvs,idx); // Get current key if( K == null ) { // Slot is free? // Found an empty Key slot - which means this Key has never been in // this table. No need to put a Tombstone - the Key is not here! if( putval == TOMBSTONE ) return putval; // Not-now & never-been in this table // Claim the null key-slot if( CAS_key(kvs,idx, null, key ) ) { // Claim slot for Key chm._slots.add(1); // Raise key-slots-used count hashes[idx] = fullhash; // Memoize fullhash break; // Got it! } // CAS to claim the key-slot failed. // // This re-read of the Key points out an annoying short-coming of Java // CAS. Most hardware CAS's report back the existing value - so that // if you fail you have a *witness* - the value which caused the CAS // to fail. The Java API turns this into a boolean destroying the // witness. Re-reading does not recover the witness because another // thread can write over the memory after the CAS. Hence we can be in // the unfortunate situation of having a CAS fail *for cause* but // having that cause removed by a later store. This turns a // non-spurious-failure CAS (such as Azul has) into one that can // apparently spuriously fail - and we avoid apparent spurious failure // by not allowing Keys to ever change. K = key(kvs,idx); // CAS failed, get updated value assert K != null; // If keys[idx] is null, CAS shoulda worked } // Key slot was not null, there exists a Key here // We need a volatile-read here to preserve happens-before semantics on // newly inserted Keys. If the Key body was written just before inserting // into the table a Key-compare here might read the uninitalized Key body. // Annoyingly this means we have to volatile-read before EACH key compare. newkvs = chm._newkvs; // VOLATILE READ before key compare if( keyeq(K,key,hashes,idx,fullhash) ) break; // Got it! // get and put must have the same key lookup logic! Lest 'get' give // up looking too soon. //topmap._reprobes.add(1); if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or key == TOMBSTONE ) { // found a TOMBSTONE key, means no more keys // We simply must have a new table to do a 'put'. At this point a // 'get' will also go to the new table (if any). We do not need // to claim a key slot (indeed, we cannot find a free one to claim!). newkvs = chm.resize(topmap,kvs); if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy return putIfMatch(topmap,newkvs,key,putval,expVal); } idx = (idx+1)&(len-1); // Reprobe! } // End of spinning till we get a Key slot // --- // Found the proper Key slot, now update the matching Value slot. We // never put a null, so Value slots monotonically move from null to // not-null (deleted Values use Tombstone). Thus if 'V' is null we // fail this fast cutout and fall into the check for table-full. if( putval == V ) return V; // Fast cutout for no-change // See if we want to move to a new table (to avoid high average re-probe // counts). We only check on the initial set of a Value from null to // not-null (i.e., once per key-insert). Of course we got a 'free' check // of newkvs once per key-compare (not really free, but paid-for by the // time we get here). if( newkvs == null && // New table-copy already spotted? // Once per fresh key-insert check the hard way ((V == null && chm.tableFull(reprobe_cnt,len)) || // Or we found a Prime, but the JMM allowed reordering such that we // did not spot the new table (very rare race here: the writing // thread did a CAS of _newkvs then a store of a Prime. This thread // reads the Prime, then reads _newkvs - but the read of Prime was so // delayed (or the read of _newkvs was so accelerated) that they // swapped and we still read a null _newkvs. The resize call below // will do a CAS on _newkvs forcing the read. V instanceof Prime) ) newkvs = chm.resize(topmap,kvs); // Force the new table copy to start // See if we are moving to a new table. // If so, copy our slot and retry in the new table. if( newkvs != null ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); // --- // We are finally prepared to update the existing table while( true ) { assert !(V instanceof Prime); // Must match old, and we do not? Then bail out now. Note that either V // or expVal might be TOMBSTONE. Also V can be null, if we've never // inserted a value before. expVal can be null if we are called from // copy_slot. if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? V != expVal && // No instant match already? (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last return V; // Do not update! // Actually change the Value in the Key,Value pair if( CAS_val(kvs, idx, V, putval ) ) { // CAS succeeded - we did the update! // Both normal put's and table-copy calls putIfMatch, but table-copy // does not (effectively) increase the number of live k/v pairs. if( expVal != null ) { // Adjust sizes - a striped counter if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); } return (V==null && expVal!=null) ? TOMBSTONE : V; } // Else CAS failed V = val(kvs,idx); // Get new value // If a Prime'd value got installed, we need to re-run the put on the // new table. Otherwise we lost the CAS to another racing put. // Simply retry from the start. if( V instanceof Prime ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); } }
这个方法是我们put的主逻辑。
分成三个部分来看,无论被哪个接口调用时候,第一步是根据key来定位,如果不存在就直接返回,或者用CAS创建,并且进入下一步。或者发现key已经存在,那么我们也直接进入下一步。将多余的注释删除后得到如下代码:
while( true ) { // Spin till we get a Key slot V = val(kvs,idx); // Get old value (before volatile read below!) K = key(kvs,idx); // Get current key if( K == null ) { // Slot is free? if( putval == TOMBSTONE ) return putval; // Not-now & never-been in this table if( CAS_key(kvs,idx, null, key ) ) { // Claim slot for Key chm._slots.add(1); // Raise key-slots-used count hashes[idx] = fullhash; // Memoize fullhash break; // Got it! } K = key(kvs,idx); // CAS failed, get updated value assert K != null; // If keys[idx] is null, CAS shoulda worked } newkvs = chm._newkvs; // VOLATILE READ before key compare if( keyeq(K,key,hashes,idx,fullhash) ) break; // Got it! if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or key == TOMBSTONE ) { // found a TOMBSTONE key, means no more keys newkvs = chm.resize(topmap,kvs); if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy return putIfMatch(topmap,newkvs,key,putval,expVal); } idx = (idx+1)&(len-1); // Reprobe! }
2-3行,根据前面hash码计算出的idx来定位对象组_kvs中的K和V。由于是采用openAddress的寻址方法,所以很可能这里要多试几遍。
4-13行,处理当发现K==null的情况,这种情况下假如调用的是remove方法,可以直接结束。否则就cas将我们的key值填入对应的下标。如果成功则增加chm一个slots计数。并且将计算出的fullhash填入hashes以备后用。然后直接跳出循环,结束查找对应key的逻辑。
15-18行,假如cas操作失败,或则K!=null,那么我们就得对比K与我们尝试的key的值(传入的hashes显然是之前存入,用于此时的对比),这里的第15行比较特别:newkvs = chm._newkvs; 因为_newkvs是volatile变量,chm对应的_kvs[0]则是线程开始前就初始化成功的变量,所以先读_newkvs的这个volatile read的语义能够确保接下来,K读到的数据是初始化完全的,从而能够参与equals对比。这里说明不属于volatile变量的_kvs对象组中的元素,似乎只要是通过CAS操作来更新值,那么更新后的值必定能够被其他线程看到(我认为不一定,这一点存疑,可见 内存可见性 )。这里要求不存在依然读到旧值的情况(其实不一定能够保证),但是可能存在读到不完全的对象的可能。但是假如这个对象是volatile变量,或者读取时候采用getXXXVolatile或则在普通读取之前先读取某个volatile变量,那么就能确保读取到更新后的完整数据。假如对比的结果是true,说明找对了K,则跳出这个循环。
20-26行,到了这里说明K不对,我们就要继续找对的。首先增加一个reprobe_cnt 用于统计失败次数。如果失败的次数达到一定的程度(map总容量的1/4+10)或者key==TOMBSTONE(虽然我没发现这种情况),则扩容(后面说扩容)。并且在新创建的kvs上插入数据。
28行,向右移动一个节点查再次查找。
第二部分:
代码如下:
if( putval == V ) return V; // Fast cutout for no-change if( newkvs == null && // New table-copy already spotted? // Once per fresh key-insert check the hard way ((V == null && chm.tableFull(reprobe_cnt,len)) || V instanceof Prime) ) newkvs = chm.resize(topmap,kvs); // Force the new table copy to start if( newkvs != null ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal);
1行,假如put进来的val与原来的值相同,则不需要做工作直接返回。
3-7行,假如我们需要增加value而现在map容量过小,那我们需要扩容,或则已经开始扩容。那么我们通过第八行得到新的kvs。
10-11行,如果返现旧值新的kvs已经构造了,我们尝试先将旧值复制到新的kvs上(此过程可能需要协助复制旧kvs的一部分数据),然后接着将当前值put进新的kvs。
第三部分代码:
// We are finally prepared to update the existing table while( true ) { assert !(V instanceof Prime); if( expVal != NO_MATCH_OLD && // Do we care about expected-Value at all? V != expVal && // No instant match already? (expVal != MATCH_ANY || V == TOMBSTONE || V == null) && !(V==null && expVal == TOMBSTONE) && // Match on null/TOMBSTONE combo (expVal == null || !expVal.equals(V)) ) // Expensive equals check at the last return V; // Do not update! if( CAS_val(kvs, idx, V, putval ) ) { if( expVal != null ) { // Adjust sizes - a striped counter if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); } return (V==null && expVal!=null) ? TOMBSTONE : V; } V = val(kvs,idx); // Get new value if( V instanceof Prime ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal); }
5-10行,这里的代码有点乱,意思是分别对expVal为 NO_MATCH_OLD TOMBSTONE MATCH_ANY 其他值,这四种情况进行区分。它可以改写为如下清晰的代码:
if ( expVal != NO_MATCH_OLD && ( (( expVal == TOMBSTONE) && (V != null && V != TOMBSTONE)) || (( expVal == MATCH_ANY) && (V == null || V == TOMBSTONE)) || ( expVal != V && ( expVal == null || !expVal.equals(V))) )) return V;
所以这里排除了我们不需要update的情况。
接着看12-20行,CAS_val真正的来更新所对应idx上value的值。假如失败21-23行会取新的V值,然后查看是否可以重试,或者需要往新的kvs里面插入(扩容情况)。
假如成功,那么说明我们的key/value成功插入。
那么14-18行
if( expVal != null ) { // Adjust sizes - a striped counter if( (V == null || V == TOMBSTONE) && putval != TOMBSTONE ) chm._size.add( 1); if( !(V == null || V == TOMBSTONE) && putval == TOMBSTONE ) chm._size.add(-1); } return (V==null && expVal!=null) ? TOMBSTONE : V;
我们当expVal != null说明是正常的外部调用,只有内部复制时候expVal才会==null。接着,putval值用来区分remove操作和非remove操作,putVal==TOMBSTONE说明当前是remove操作,那么假如之前V存在,那么我们map的_size会减一。假如putVal!=TOMBSTONE,那么说明当前操作不是remove,肯能是put、putIfAbsent等操作。那么假如V之前不存在,则_size需要加一。
到此,我们的put类操作就结束了。
我们接着来看看get操作,由于代码不多,所以直接贴出:
// Never returns a Prime nor a Tombstone. @Override public TypeV get( Object key ) { final int fullhash= hash (key); // throws NullPointerException if key is null final Object V = get_impl(this,_kvs,key,fullhash); assert !(V instanceof Prime); // Never return a Prime return (TypeV)V; } private static final Object get_impl( final NonBlockingHashMap topmap, final Object[] kvs, final Object key, final int fullhash ) { final int len = len (kvs); // Count of key/value pairs, reads kvs.length final CHM chm = chm (kvs); // The CHM, for a volatile read below; reads slot 0 of kvs final int[] hashes=hashes(kvs); // The memoized hashes; reads slot 1 of kvs int idx = fullhash & (len-1); // First key hash int reprobe_cnt=0; while( true ) { // Probe table. Each read of 'val' probably misses in cache in a big // table; hopefully the read of 'key' then hits in cache. final Object K = key(kvs,idx); // Get key before volatile read, could be null final Object V = val(kvs,idx); // Get value before volatile read, could be null or Tombstone or Prime if( K == null ) return null; // A clear miss final Object[] newkvs = chm._newkvs; // VOLATILE READ before key compare if( keyeq(K,key,hashes,idx,fullhash) ) { // Key hit! Check for no table-copy-in-progress if( !(V instanceof Prime) ) // No copy? return (V == TOMBSTONE) ? null : V; // Return the value return get_impl(topmap,chm.copy_slot_and_check(topmap,kvs,idx,key),key,fullhash); // Retry in the new table } if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes key == TOMBSTONE ) // found a TOMBSTONE key, means no more keys in this table return newkvs == null ? null : get_impl(topmap,topmap.help_copy(newkvs),key,fullhash); // Retry in the new table idx = (idx+1)&(len-1); // Reprobe by 1! (could now prefetch) } }
2-8行与put操作那边类似
12-24行,假如key==null则说明不存在,直接返回null。否则,通过读取chm._newkvs来得到更新后的key。(这里的volatile read语义与put时的compareAndSwapObject能让我们读取到更新后的值。)接着则是对比key,然后相同的情况下判断V的值,假如找到我们需要的值则返回,否则返回空或者在下一层的kvs里面协助复制并且继续查找。
26-30行,假如key值不一样,那么我们增加记录一次探测次数。或则查找下一个。或则直接去下一层查找。(注意)这里的重点是,在统一层kvs里面,get只需要遍历reprobe_limit(len)个数的key,假如之后的newkvs为空则说明不存在,所以返回null。否则继续去下一层查找。这种方式与put时候的策略配合,虽然增加了内存消耗了,但是节省了get需要的时间。
接着我们来看看重要的扩容。
首先看张图片,大概说明了数据结构map的下一层数据的组织结构。
Figure 1:
以上是一个直观的展示。我们来看看看 代码的入口:
if( ++reprobe_cnt >= reprobe_limit(len) || // too many probes or key == TOMBSTONE ) { newkvs = chm.resize(topmap,kvs); if( expVal != null ) topmap.help_copy(newkvs); // help along an existing copy return putIfMatch(topmap,newkvs,key,putval,expVal); } if( newkvs == null && // New table-copy already spotted? // Once per fresh key-insert check the hard way ((V == null && chm.tableFull(reprobe_cnt,len)) || V instanceof Prime) ) newkvs = chm.resize(topmap,kvs); // Force the new table copy to start // See if we are moving to a new table. // If so, copy our slot and retry in the new table. if( newkvs != null ) return putIfMatch(topmap,chm.copy_slot_and_check(topmap,kvs,idx,expVal),key,putval,expVal);
所以扩容仅在两种情况下发生:
探测key的阶段,当前probe次数达到了10+1/4*len或者我们发现插入的key居然是TOMBSTONE。
找到了key,并且对应的V为空&&当前的probe次数达到了10,当前的_slots已经达到了10+1/4*len。
我们来看chm.resize:
private final Object[] resize( NonBlockingHashMap topmap, Object[] kvs) { assert chm(kvs) == this; Object[] newkvs = _newkvs; // VOLATILE READ if( newkvs != null ) // See if resize is already in progress return newkvs; // Use the new table already int oldlen = len(kvs); // Old count of K,V pairs allowed int sz = size(); // Get current table count of active K,V pairs int newsz = sz; // First size estimate if( sz >= (oldlen>>2) ) { // If we are >25% full of keys then... newsz = oldlen<<1; // Double size if( sz >= (oldlen>>1) ) // If we are >50% full of keys then... newsz = oldlen<<2; // Double double size } long tm = System.currentTimeMillis(); long q=0; if( newsz <= oldlen && // New table would shrink or hold steady? tm <= topmap._last_resize_milli+10000 && // Recent resize (less than 1 sec ago) (q=_slots.estimate_get()) >= (sz<<1) ) // 1/2 of keys are dead? newsz = oldlen<<1; // Double the existing size if( newsz < oldlen ) newsz = oldlen; int log2; for( log2=MIN_SIZE_LOG; (1<<log2) < newsz; log2++ ) ; // Compute log2 of size long r = _resizers; while( !_resizerUpdater.compareAndSet(this,r,r+1) ) r = _resizers; int megs = ((((1<<log2)<<1)+4)<<3/*word to bytes*/)>>20/*megs*/; if( r >= 2 && megs > 0 ) { // Already 2 guys trying; wait and see newkvs = _newkvs; // Between dorking around, another thread did it if( newkvs != null ) // See if resize is already in progress return newkvs; // Use the new table already try { Thread.sleep(8*megs); } catch( Exception e ) { } } newkvs = _newkvs; if( newkvs != null ) // See if resize is already in progress return newkvs; // Use the new table already newkvs = new Object[((1<<log2)<<1)+2]; // This can get expensive for big arrays newkvs[0] = new CHM(_size); // CHM in slot 0 newkvs[1] = new int[1<<log2]; // hashes in slot 1 if( _newkvs != null ) // See if resize is already in progress return _newkvs; // Use the new table already if( CAS_newkvs( newkvs ) ) { // NOW a resize-is-in-progress! topmap.rehash(); // Call for Hashtable's benefit } else // CAS failed? newkvs = _newkvs; // Reread new table return newkvs; }
4-6行,如果_newkvs已经有了,则返回它
8-16行,得到当前_size数目,如果当前元素对数达到1/4,则扩容为原来两倍,如果达到1/2,扩容为4倍,否则newsz = sz。
18-23行,根据_size跟slots的值判断,如果dead状态的元素过多,则可能需要缩减map的容量。但是从25行(if( newsz < oldlen ) newsz = oldlen)来看,至少维持当前的容量。
27-42行,计算适合当前newsz的2的次方个数。计算当前参与resize的线程个数,假如多于2个则多sleep一会儿,然后取得_newkvs,很快能这个_newkvs已经是被构造完成的了。
44-55行,构造newkvs ,并且尝试CAS将它替代,然后返回构造完成的_newkvs;
我们再来看help_copy:
private final Object[] help_copy( Object[] helper ) { // Read the top-level KVS only once. We'll try to help this copy along, // even if it gets promoted out from under us (i.e., the copy completes // and another KVS becomes the top-level copy). Object[] topkvs = _kvs; CHM topchm = chm(topkvs); if( topchm._newkvs == null ) return helper; // No copy in-progress topchm.help_copy_impl(this,topkvs,false); return helper; }
在当前map的_kvs中得到CHM,查看是否需要先协助CHM去扩容。假如当前是普通put操作,则将会协助复制,否则返回。
接着是重要的help_copy_impl:
private final void help_copy_impl( NonBlockingHashMap topmap, Object[] oldkvs, boolean copy_all ) { assert chm(oldkvs) == this; Object[] newkvs = _newkvs; assert newkvs != null; // Already checked by caller int oldlen = len(oldkvs); // Total amount to copy final int MIN_COPY_WORK = Math.min(oldlen,1024); // Limit per-thread work int panic_start = -1; int copyidx=-9999; // Fool javac to think it's initialized while( _copyDone < oldlen ) { // Still needing to copy? if( panic_start == -1 ) { // No panic? copyidx = (int)_copyIdx; while( copyidx < (oldlen<<1) && // 'panic' check !_copyIdxUpdater.compareAndSet(this,copyidx,copyidx+MIN_COPY_WORK) ) copyidx = (int)_copyIdx; // Re-read if( !(copyidx < (oldlen<<1)) ) // Panic! panic_start = copyidx; // Record where we started to panic-copy } int workdone = 0; for( int i=0; i<MIN_COPY_WORK; i++ ) if( copy_slot(topmap,(copyidx+i)&(oldlen-1),oldkvs,newkvs) ) // Made an oldtable slot go dead? workdone++; // Yes! if( workdone > 0 ) // Report work-done occasionally copy_check_and_promote( topmap, oldkvs, workdone );// See if we can promote copyidx += MIN_COPY_WORK; if( !copy_all && panic_start == -1 ) // No panic? return; // Then done copying after doing MIN_COPY_WORK } copy_check_and_promote( topmap, oldkvs, 0 );// See if we can promote }
6行:取较小的数字,作为一次性复制的一段数据个数。oldlen或则1024。
10-19行:copyidx用于跟踪复制的元素下标,panic_start 用于指示当前是否需要复制全部数据。所以这里尝试将_copyIdx原子地增加MIN_COPY_WORK,从而获取对这段数据的复制权。并且查看copyidx 是否过大,假如超过oldlen*2,则说明当前所有数据都正在被复制。所以该线程的工作是确保当前所有数据复制完毕。
21-29行,真正开始了我们的复制:
int workdone = 0; for( int i=0; i<MIN_COPY_WORK; i++ ) if( copy_slot(topmap,(copyidx+i)&(oldlen-1),oldkvs,newkvs) ) // Made an oldtable slot go dead? workdone++; // Yes! if( workdone > 0 ) // Report work-done occasionally copy_check_and_promote( topmap, oldkvs, workdone );// See if we can promote copyidx += MIN_COPY_WORK;
这里的代码,通过将旧值复制到原本为空的newkvs上之后,然后增加_copyDone,从而尝试promote,后来可以看到,这里的代码运用了状态机的思想,从而非常好的处理了竞争。
30-33行,不用复制全部的情况下,及时返回。最后一句copy_check_and_promote( topmap, oldkvs, 0 ); 确保将来极端情况下,由于后面的newkvs相对顶层的kvs先扩容,但是由于其并不是顶层的_kvs,所以只能留给将来的copy操作来复制。所以这里要有copy_check_and_promote( topmap, oldkvs, 0 )。
over。
我们看一下另一个入口:
copy_slot_and_check private final Object[] copy_slot_and_check( NonBlockingHashMap topmap, Object[] oldkvs, int idx, Object should_help ) { assert chm(oldkvs) == this; Object[] newkvs = _newkvs; // VOLATILE READ // We're only here because the caller saw a Prime, which implies a // table-copy is in progress. assert newkvs != null; if( copy_slot(topmap,idx,oldkvs,_newkvs) ) // Copy the desired slot copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied // Generically help along any copy (except if called recursively from a helper) return (should_help == null) ? newkvs : topmap.help_copy(newkvs); }
这里是一个针对单个元素的复制,并且根据should_help来判断是直接往新的kvs里面插入数据,还是先协助久的kvs完成复制,然后往新的kvs插入数据。
这里也同样调用了copy_slot和copy_check_and_promote,可想而知这两个调用是重点。
我们来看关键的copy_slot(其实名字写作transferSlot更合适):
private boolean copy_slot( NonBlockingHashMap topmap, int idx, Object[] oldkvs, Object[] newkvs ) { Object key; while( (key=key(oldkvs,idx)) == null ) CAS_key(oldkvs,idx, null, TOMBSTONE); Object oldval = val(oldkvs,idx); // Read OLD table while( !(oldval instanceof Prime) ) { final Prime box = (oldval == null || oldval == TOMBSTONE) ? TOMBPRIME : new Prime(oldval); if( CAS_val(oldkvs,idx,oldval,box) ) { // CAS down a box'd version of oldval if( box == TOMBPRIME ) return true; oldval = box; // Record updated oldval break; // Break loop; oldval is now boxed by us } oldval = val(oldkvs,idx); // Else try, try again } if( oldval == TOMBPRIME ) return false; // Copy already complete here! Object old_unboxed = ((Prime)oldval)._V; assert old_unboxed != TOMBSTONE; boolean copied_into_new = (putIfMatch(topmap, newkvs, key, old_unboxed, null) == null); while( !CAS_val(oldkvs,idx,oldval,TOMBPRIME) ) oldval = val(oldkvs,idx); return copied_into_new; } // end copy_slot } // End of CHM
3-4行,假如key==null,则替换为TOMBSTONE。这里的状态转换为 null->TOMBSTONE,或者保持为不为空的key。
6-18行,取得oldVal,假如oldval instanceof Prime成立,则说明已经先有其他线程操作过了,所以我们判断if( oldval == TOMBPRIME ) return false;这里这么做的原因是val的最终状态为TOMBPRIME 。
8-16行,根据当前的val值来构造prime对象,假如null或者值为TOMBSTONE(说明原来对应的值不存在),我们采用TOMBPRIME,否则构造一个包含val的prime。接着尝试CAS替换原来的V。假如成功,如果box是TOMBPRIME,则直接返回true,说明成功,并且不需要实际的复制操作。否则oldVal=box,break;然后带着这个包含val的值去继续操作。假如CAS失败,那么这里的第16行继续尝试oldval = val(oldkvs,idx);
20-22行,这里是真正的复制操作。因为到了这里说明当前复制操作并未真正完成,并且需要真正复制,所以我们从当前的val中取得之前的V值,然后调用putIfMatch将它插入到下一层的kvs,注意这里第五个参数为null,意思是只在原来值为null时才插入数据,同时根据返回值是否为null来判断是否完成真正的插入操作。得到copied_into_new来代表是否真正完成插入。
24-25行,将val的值改为TOMBPRIME,达到了最终的状态。
27行,返回copied_into_new。用于更新copyDone。
所以可以看出,上面的重点是key与value的状态转换。两个状态机可能的转换路径为:
KEY: null->key,或者 null->TOMBSTONE
VALUE: null->value->value2…valueN->Prime(value)->TOMBPRIME 或者 null->TOMBPRIME , null->value->TOMBSTONE->TOMBPRIME
好的,我们接着来看看copy_check_and_promote,正是copy_slot与copy_check_and_promote配合才实现了正确的复制。
copy_check_and_promote: // --- copy_check_and_promote -------------------------------------------- private final void copy_check_and_promote( NonBlockingHashMap topmap, Object[] oldkvs, int workdone ) { assert chm(oldkvs) == this; int oldlen = len(oldkvs); long copyDone = _copyDone; assert (copyDone+workdone) <= oldlen; if( workdone > 0 ) { while( !_copyDoneUpdater.compareAndSet(this,copyDone,copyDone+workdone) ) { copyDone = _copyDone; // Reload, retry assert (copyDone+workdone) <= oldlen; } } if( copyDone+workdone == oldlen && // Ready to promote this table? topmap._kvs == oldkvs && // Looking at the top-level table? // Attempt to promote topmap.CAS_kvs(oldkvs,_newkvs) ) { topmap._last_resize_milli = System.currentTimeMillis(); // Record resize time for next check } }
注意这里的代码就非常简单了,使用传入的workdone来增加_copyDone的值。并且在_copyDone达到当前kvs的长度oldlen时(也就是复制都已经完成),并且当前_kvs是oldkvs,那么我们就更新_kvs的值为newkvs,并且将resize的时间更新一下(用于下一次扩容时的统计量)。
注意这里有个关键点,我们的_kvs并不是volatile类型的变量,所以这里的CAS_kvs即使操作成功了,其他线程也不一定立刻读取到_newkvs。但是这并不会影响功能的正确性,因为读到旧的kvs,我们会发现已经满了,并且顺着如Figure 1中粉红色的引用方向定位到正确的kvs。
到此我们的无阻赛并发哈希表的代码详解结束了。
假如把扩容的时候也算上,那么这个Hash表并不算无锁的,假如这几行
boolean copied_into_new = (putIfMatch(topmap, newkvs, key, old_unboxed, null) == null); // --- // Finally, now that any old value is exposed in the new table, we can // forever hide the old-table value by slapping a TOMBPRIME down. This // will stop other threads from uselessly attempting to copy this slot // (i.e., it's a speed optimization not a correctness issue). while( !CAS_val(oldkvs,idx,oldval,TOMBPRIME) ) oldval = val(oldkvs,idx); return copied_into_new;
第1行执行之后,与第11行执行之前,某个线程这个部分如果代码执行的特别慢,同时其他的线程采取的是_copyIdx满了更新整个kvs的策略,那么剩余所有线程都会被该线程自旋在原地,而无法取得progress。
另一方面,get时候的操作是
// to different _kvs arrays. private static final Object key(Object[] kvs,int idx) { return kvs[(idx<<1)+2]; } private static final Object val(Object[] kvs,int idx) { return kvs[(idx<<1)+3]; }
这样普通的取元素操作,我认为存在取得旧值的可能,而且只有内存最终一致性的保证。所以当我们用某个线程1 put一个元素,另一个线程2 get这个元素,即是线程1 完成了put操作,线程2依然可能无法get到新值。建议这里改为retun _unsafe.getObjectVolatile(kvs, _Obase + idx * _Oscale);
尽管在测试中并没有观察到这种情况。
最后,可以对比下jdk8中的ConcurrentHashMap的数据转移transfer时候的代码,它是有锁的,并且采取了类似的模式来转移,一小段(MIN_TRANSFER_STRIDE):
同时可参考我写的ConcurrentHashMap
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
PS: 有杭州的基础技术、系统编程或者人工智能方向的研发岗位不。
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/99808.html