ConcurrentHashMap 原理和源码分析(二)详解编程语言

上一篇文章《ConcurrentHashMap 原理和源码分析(一)》 介绍了ConcurrentHashMap基础内部结构、还有原理;

本篇主要分析源代码,从几个主要的方法看ConcurrentHashMap内部是怎么实现的,以及ConcurrentHashMap的扩容机制和遍历机制。

其中上篇中介绍的一个重要的变量 sizeCtl 在这里会不断出现,为了方便理解,这里再把它拉过来下:

sizeCtl == 0 时候,默认情况
sizeCtl == -1 时候,说明table正在初始化
sizeCtl > 0 时候,说明接下来初始化要的初始化容量或者是扩容成功后threadshold的值
sizeCtl < 0 时候,说明正在扩容,高16位是数据校验位,低16位表示 n – 1个线程在参与扩容线程数

基本操作

构造函数和初始化

默认构造函数,什么事情都不做,可见ConcurrentHashMap是懒加载的模式,等到需要的时候再初始化;

public ConcurrentHashMap() {
    
} 

再看一个比较复杂的构造函数

// loadFactor 是加载因子,达到扩容的临界值 = 容量 * loadFactor  
// concurrencyLevel 表示支持多少个线程并发级别,支持多线程访问的单位就是Hash桶, 
// 所以 concurrencyLevel 等于最小的桶数量(最小容量) 
public ConcurrentHashMap(int initialCapacity, 
                         float loadFactor, int concurrencyLevel) {
    
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 
            throw new IllegalArgumentException(); 
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins 
            initialCapacity = concurrencyLevel;   // as estimated threads 
        // 这里跟hashMap的初始化一样,为了避免产生一次不必要的扩容 
        // 把initialCapacity当作临界值,重新算capacity 
        long size = (long)(1.0 + (long)initialCapacity / loadFactor); 
        int cap = (size >= (long)MAXIMUM_CAPACITY) ? 
            MAXIMUM_CAPACITY : tableSizeFor((int)size); 
        this.sizeCtl = cap; // sizeCtl表示准备扩容的大小 
} 

初始化 table

 private final Node<K,V>[] initTable() {
    
        Node<K,V>[] tab; int sc; 
        while ((tab = table) == null || tab.length == 0) {
    
            if ((sc = sizeCtl) < 0) // 说明正在扩容 
                Thread.yield(); // lost initialization race; just spin 
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    
            	 // sc = sizeCtl; // 等于要初始化的容量 
            	 // sizeCtl = -1; // CAS操作,设置为正在初始化 
                try {
    
                    if ((tab = table) == null || tab.length == 0) {
    
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 
                        @SuppressWarnings("unchecked") 
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 
                        table = tab = nt; 
                        // 初始化或者扩容成功后,sizeCtl变成threshold 
                        // n - n /4 = n * 0.75,实际上没用上loadFactor 
                        sc = n - (n >>> 2); 
                    } 
                } finally {
    
                    sizeCtl = sc; // 初始化完毕,将sizeCtl设置为threshold 
                } 
                break; 
            } 
        } 
        return tab; 
    } 

这里初始化操作中就使用CAS操作避免加锁的情况,在ConcurrentHashMap可以大量看到这种模式:

无限循环 + CAS

for ( ; ; ) { // 无限循环
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) //CAS操作
return next;
}

putVal

增加或者修改一个节点

// onlyIfAbsent表示如果存在这个节点,是否修改 
final V putVal(K key, V value, boolean onlyIfAbsent) {
 
// 可见,ConcurrentHashMap不支持null的key或value 
if (key == null || value == null) throw new NullPointerException(); 
int hash = spread(key.hashCode()); 
int binCount = 0;// 桶里面节点的数量,用来决定是否树化 
for (Node<K,V>[] tab = table;;) {
// 无限循环 + CAS 
Node<K,V> f; int n, i, fh; 
if (tab == null || (n = tab.length) == 0)// 若未初始化 
tab = initTable();// 初始化table 
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 若CAS找到的是空的hash桶,直接CAS添加节点 
if (casTabAt(tab, i, null, 
new Node<K,V>(hash, key, value, null))) 
break; // 当hash桶为空,直接CAS添加,不用加锁 
} 
else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明table在扩容 
tab = helpTransfer(tab, f);// 加入扩容 
else {
// hash桶不为空 
V oldVal = null; 
synchronized (f) {
// 对hash桶的首节点加锁 
if (tabAt(tab, i) == f) {
 
if (fh >= 0) {
// hash > 0 链表节点 
binCount = 1; 
for (Node<K,V> e = f;; ++binCount) {
 
K ek; 
if (e.hash == hash && 
((ek = e.key) == key || 
(ek != null && key.equals(ek)))) {
// 存在对应的key 
oldVal = e.val; 
if (!onlyIfAbsent) // 允许修改 
e.val = value; 
break; 
} 
Node<K,V> pred = e; 
if ((e = e.next) == null) {
// 不存在对应的key 
pred.next = new Node<K,V>(hash, key, 
value, null); 
break; 
} 
} 
} 
else if (f instanceof TreeBin) {
// 碰到红黑树首节点 
Node<K,V> p; 
binCount = 2; 
// 插入节点, 
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 
value)) != null) {
 
// 存在这个key 
oldVal = p.val; 
if (!onlyIfAbsent) 
p.val = value; 
} 
} 
else if (f instanceof ReservationNode) 
throw new IllegalStateException("Recursive update"); 
} 
} 
if (binCount != 0) {
 
if (binCount >= TREEIFY_THRESHOLD)// 超过树化数量 8 
treeifyBin(tab, i);// 链表转化为红黑树 
if (oldVal != null) 
return oldVal; 
break; 
} 
} 
} 
addCount(1L, binCount);  // 新增一个节点数量 
return null; 
} 

其实添加节点的流程也很清晰了:

  1. 找到对应的hash桶
  2. hash桶为空,则CAS添加节点
  3. hash桶是ForwardingNode,加入帮助扩容
  4. hash桶不为空,锁住hash桶,根据链表、红黑树插入或修改节点
  5. 插入完成后,树化、增加节点数量

这个操作流程在修改、删除基本一样,在对节点进行操作的时候需要通过互斥锁保证线程安全,但只需要锁住对应的hash桶。

remove

删除一个节点

//删除节点 
public V remove(Object key) {
 
return replaceNode(key, null, null); 
} 
// 替换节点 
// table[key].val等于期望值cv或cv等于null,更新为value 
// 如果value为null,删除该节点 
final V replaceNode(Object key, V value, Object cv) {
 
int hash = spread(key.hashCode()); 
for (Node<K,V>[] tab = table;;) {
 
Node<K,V> f; int n, i, fh; 
if (tab == null || (n = tab.length) == 0 || 
(f = tabAt(tab, i = (n - 1) & hash)) == null) // 找到hash桶为null 
break; 
else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明table在扩容 
tab = helpTransfer(tab, f);// 加入帮助扩容 
else {
 
V oldVal = null; 
boolean validated = false; 
synchronized (f) {
// 锁住hash桶 
if (tabAt(tab, i) == f) {
 
if (fh >= 0) {
 
validated = true; 
for (Node<K,V> e = f, pred = null;;) {
 
K ek; 
if (e.hash == hash && 
((ek = e.key) == key || 
(ek != null && key.equals(ek)))) {
 
V ev = e.val; 
if (cv == null || cv == ev || 
(ev != null && cv.equals(ev))) {
// 替换修改值 
oldVal = ev; 
if (value != null) 
e.val = value; 
else if (pred != null) 
pred.next = e.next; 
else 
setTabAt(tab, i, e.next); 
} 
break; 
} 
pred = e; 
if ((e = e.next) == null) 
break; 
} 
} 
else if (f instanceof TreeBin) {
// 红黑树 
validated = true; 
TreeBin<K,V> t = (TreeBin<K,V>)f; 
TreeNode<K,V> r, p; 
if ((r = t.root) != null && 
(p = r.findTreeNode(hash, key, null)) != null) {
// 同样替换修改 
V pv = p.val; 
if (cv == null || cv == pv || 
(pv != null && cv.equals(pv))) {
 
oldVal = pv; 
if (value != null) 
p.val = value; 
else if (t.removeTreeNode(p)) 
setTabAt(tab, i, untreeify(t.first)); 
} 
} 
} 
else if (f instanceof ReservationNode) 
throw new IllegalStateException("Recursive update"); 
} 
} 
if (validated) {
 
if (oldVal != null) {
 
if (value == null) 
addCount(-1L, -1);// 将节点数量-1 
return oldVal; 
} 
break; 
} 
} 
} 
return null; 
} 

上面replaceNode的流程跟putVal的流程基本上是一样:

  1. 找到对应的hash桶
  2. hash桶是ForwardingNode,加入帮助扩容
  3. hash桶不为空,锁住hash桶,根据链表、红黑树修改节点
  4. 增加节点数量

get

通过指定的key,获取value

    public V get(Object key) {
 
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 
int h = spread(key.hashCode());// 计算hash值 
if ((tab = table) != null && (n = tab.length) > 0 && 
(e = tabAt(tab, (n - 1) & h)) != null) {
 
if ((eh = e.hash) == h) {
// e是hash桶首节点 
if ((ek = e.key) == key || (ek != null && key.equals(ek))) 
return e.val;// 首节点是目标节点,直接返回 
} 
else if (eh < 0)// hash小于0,调用特殊节点的find() 
return (p = e.find(h, key)) != null ? p.val : null; 
while ((e = e.next) != null) {
// 普通链表 
if (e.hash == h && 
((ek = e.key) == key || (ek != null && key.equals(ek)))) 
return e.val; 
} 
} 
return null; 
} 

get方法的流程也是很清晰的,整个流程都没有加互斥锁:

  1. 找到对应的hash桶;
  2. hash桶首节点是想找的目标节点,直接返回;
  3. hash桶首节点的hash小于0,说明是树的首节点或者是forwardindNode,直接调用红黑树的find(),或者forwardindNode的find();在上一篇文章有介绍过forwardindNode的find方法;
  4. hash桶首节点的hash大于0,说明是普通链表,递归查找。

扩容机制

扩容

ConcurrentHashMap支持多线程协助扩容,扩容的工作分成两部分:

  1. 生成一个新的table,即nextTable,容量为原来的2倍
  2. 将旧的table迁移到nextTable,这部分也是众多线程能分摊的工作

这里会产生一个疑问:多个线程参与迁移工作会不会相互干扰啊?答案是不会的。

任意一个节点,假设节点所在桶序号是2,扩容之后只有两种可能,一种是呆在原来的桶2,一种是迁移到序号是(2 + 旧容量)桶中。

线程从transferIndex中分配得到一个区间的hash桶迁移任务,比如说是旧的table的 [a, b) 区间,那么这个线程工作的桶就是 [a, b) 和 [ a+ 旧capacity, b+ 旧capacity),这样每个任务就独立了,很适合多线程工作。

示意图

tryPresize
 private final void tryPresize(int size) {
 
// 计算容量 
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : 
tableSizeFor(size + (size >>> 1) + 1); 
int sc; 
while ((sc = sizeCtl) >= 0) {
 
// sizeCtl > 0,可能是threshold或准备扩容的容量 
Node<K,V>[] tab = table; int n; 
if (tab == null || (n = tab.length) == 0) {
 
// table为空,需要进行初始化 
n = (sc > c) ? sc : c;// 选择较大的容量 
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
 
//修改sizeCtl为-1,表示正在初始化 
try {
 
if (table == tab) {
 
@SuppressWarnings("unchecked") 
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 
table = nt; 
sc = n - (n >>> 2);// sc = n * 3/4; 
} 
} finally {
 
sizeCtl = sc;// 设置成theashold 
} 
} 
} 
else if (c <= sc || n >= MAXIMUM_CAPACITY) 
break; 
else if (tab == table) {
// 存在table,调用transfer扩容 
int rs = resizeStamp(n); 
if (U.compareAndSwapInt(this, SIZECTL, sc, 
(rs << RESIZE_STAMP_SHIFT) + 2)) 
transfer(tab, null); 
} 
} 
} 
transfer

线程在许多操作如put、replace、remove碰到ForwardingNode都会调用,具体来看下怎么扩容的

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
 
int n = tab.length, stride; 
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 
stride = MIN_TRANSFER_STRIDE; // subdivide range 
if (nextTab == null) {
 // 初始化nextTable           
try {
 
@SuppressWarnings("unchecked") 
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 
nextTab = nt; 
} catch (Throwable ex) {
      // try to cope with OOME 
sizeCtl = Integer.MAX_VALUE; 
return; 
} 
nextTable = nextTab; 
transferIndex = n; 
//transferIndex表示目前指向的最大的hash桶索引数, 
// 它会每次分配出去stride 个数量的桶数给线程,慢慢的减少,直至为0,表示分配完成 
} 
int nextn = nextTab.length; 
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 
boolean advance = true; 
boolean finishing = false; // to ensure sweep before committing nextTab 
for (int i = 0, bound = 0;;) {
 
Node<K,V> f; int fh; 
while (advance) {
 
// 分配负责的任务区间 
int nextIndex, nextBound; 
if (--i >= bound || finishing) 
advance = false; 
else if ((nextIndex = transferIndex) <= 0) {
 
i = -1; 
advance = false; 
} 
else if (U.compareAndSwapInt 
(this, TRANSFERINDEX, nextIndex, 
nextBound = (nextIndex > stride ? 
nextIndex - stride : 0))) {
 
bound = nextBound; 
i = nextIndex - 1; 
advance = false; 
} 
} 
if (i < 0 || i >= n || i + n >= nextn) {
 
int sc; 
// 扩容任务结束,最后一个线程负责修改table,nextTable,sizeCtl 
if (finishing) {
 
nextTable = null; 
table = nextTab; 
sizeCtl = (n << 1) - (n >>> 1); 
return; 
} 
// 当前线程负责的任务结束,sizeCtl - 1 
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 
// resizeStamp(n) << RESIZE_STAMP_SHIFT) 获取一个校验戳,左移16位,那么低16位都位0 
// sc = (高16位)校验戳 + (低16位)(线程数 + 1) 
// 初始化时候 sc = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 
// 所以只要在最后一个线程:sc - 1 - 1 == 校验戳 
// 这里既判断是不是最后一个线程,又利用校验戳判断了是不是本次扩容任务  
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
return; 
finishing = advance = true; 
i = n; // recheck before commit 
} 
} 
else if ((f = tabAt(tab, i)) == null)// hash桶为null,置一个ForwardingNode 
advance = casTabAt(tab, i, null, fwd); 
else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明处理过 
advance = true;  
else {
 
synchronized (f) {
// 锁住hash桶 
if (tabAt(tab, i) == f) {
 
Node<K,V> ln, hn; 
if (fh >= 0) {
// hash>0,链表节点 
int runBit = fh & n; 
Node<K,V> lastRun = f; 
for (Node<K,V> p = f.next; p != null; p = p.next) {
 
int b = p.hash & n; 
if (b != runBit) {
 
runBit = b; 
lastRun = p; 
} 
} 
// 这部分跟hashmap迁移一样 
// 根据hash & 原capacity 是否等于0, 切分成两个链表,然后分别放在新表的i和i+n位置 
// hash&n == 0 说明 hash的值小,处在原来的位置i 
// hash&n > 0 说明 hash值大,需要迁移到新的位置i+n 
if (runBit == 0) {
 
ln = lastRun; 
hn = null; 
} 
else {
 
hn = lastRun; 
ln = null; 
} 
for (Node<K,V> p = f; p != lastRun; p = p.next) {
 
int ph = p.hash; K pk = p.key; V pv = p.val; 
if ((ph & n) == 0) 
ln = new Node<K,V>(ph, pk, pv, ln); 
else 
hn = new Node<K,V>(ph, pk, pv, hn); 
} 
setTabAt(nextTab, i, ln); 
setTabAt(nextTab, i + n, hn); 
setTabAt(tab, i, fwd); 
advance = true;// 当前hash桶完成 
} 
else if (f instanceof TreeBin) {
// 红黑树,不做具体分析 
TreeBin<K,V> t = (TreeBin<K,V>)f; 
TreeNode<K,V> lo = null, loTail = null; 
TreeNode<K,V> hi = null, hiTail = null; 
int lc = 0, hc = 0; 
for (Node<K,V> e = t.first; e != null; e = e.next) {
 
int h = e.hash; 
TreeNode<K,V> p = new TreeNode<K,V> 
(h, e.key, e.val, null, null); 
if ((h & n) == 0) {
 
if ((p.prev = loTail) == null) 
lo = p; 
else 
loTail.next = p; 
loTail = p; 
++lc; 
} 
else {
 
if ((p.prev = hiTail) == null) 
hi = p; 
else 
hiTail.next = p; 
hiTail = p; 
++hc; 
} 
} 
// 低于UNTREEIFY_THRESHOLD,由树转化为链表 
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 
(hc != 0) ? new TreeBin<K,V>(lo) : t; 
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 
(lc != 0) ? new TreeBin<K,V>(hi) : t; 
setTabAt(nextTab, i, ln); 
setTabAt(nextTab, i + n, hn); 
setTabAt(tab, i, fwd); 
advance = true;// 当前hash桶完成 
} 
} 
} 
} 
} 
} 
  1. 如果nextTable为null,则生成nextTable,容量增长一倍
  2. 线程根据 transferIndex索引分配一个区间的任务
  3. 迁移过程中,将旧table中的节点用ForwaringNode代替,表示已经转移到nextTable中
  4. 再根据是链表、红黑树做迁移再根据是链表、红黑树做迁移
  5. 迁移完成后,最后一个退出的线程要负责收尾

计数统计

跟节点计数相关的有三个变量:baseCount,CounterCell,cellsBusy

cellsBusy是一个只有0和1两个状态的volatile整数,当做一个自旋锁,0代表无锁,1代表加锁

总体的原则就是:先尝试更新baseCount,失败再利用CounterCell

  1. 通过CAS尝试更新baseCount ,如果更新成功则完成,如果CAS更新失败会进入下一步;
  2. 线程通过随机数ThreadLocalRandom.getProbe() & (n-1) 计算出在counterCells数组的位置,如果不为null,则CAS尝试在couterCell上直接增加数量,如果失败会进入下一步;
  3. counterCells数组会进行扩容为原来的两倍,继续随机,继续添加;
  4. 最后,table节点的数量 = baseCount + counterCells每个cell记录下来的节点数量
// check > 0表示要检查是否扩容,一般是true 
private final void addCount(long x, int check) {
 
CounterCell[] as; 
long b, s; 
// counterCells 不为null,或者 baseCount修改失败,说明存在并发 
if ((as = counterCells) != null || 
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 
CounterCell a; long v;  int m; 
boolean uncontended = true; 
// ThreadLocalRandom.getProbe() 获取一个随机数 
// ThreadLocalRandom.getProbe()& m 获取一个小于等于m的随机数 
if (as == null || (m = as.length - 1) < 0 || 
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||  
// 比较a下面的值是否为a.value,是则更新为a.value+x,相当于是为了增加CounterCell的值 
!(uncontended = 
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
 
// 若因为上面的操作都失败了,则进入fulladdCount 
fullAddCount(x, uncontended); 
return; 
} 
if (check <= 1) 
return; 
s = sumCount(); // 计算总数 
} 
if (check >= 0) {
 // 检查是否扩容 
Node<K,V>[] tab, nt; int n, sc; 
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 
(n = tab.length) < MAXIMUM_CAPACITY) {
 
int rs = resizeStamp(n); // 根据一个基数生成扩容的戳 
if (sc < 0) {
 // 正在扩容或者正在初始化 
// (sc >>> RESIZE_STAMP_SHIFT) :sc右移16位获取校验戳,跟rs对比是否同一次扩容 
//  sc == rs + 1:不理解? 
//  (nt = nextTable) == null 是否扩容完成 
//  transferIndex <= 0 transferIndex是从大到小分发的,小于0则给线程分发完了 
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 
sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 
transferIndex <= 0) 
break;	 
// 加入帮助扩容,把正在执行transfer任务的线程数加1 
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 
transfer(tab, nt); // 迁移 
} 
// 开始新的扩容,rs +(线程数)+1 = rs+2 
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) 
transfer(tab, null); // 迁移 
s = sumCount(); 
} 
} 
private final void fullAddCount(long x, boolean wasUncontended) {
 
int h; 
if ((h = ThreadLocalRandom.getProbe()) == 0) {
 // ThreadLocalRandom未初始化 
ThreadLocalRandom.localInit();      //初始化 
h = ThreadLocalRandom.getProbe(); 
wasUncontended = true; 
} 
boolean collide = false; // 是否冲突 
for (;;) {
 
CounterCell[] as; CounterCell a; int n; long v; 
if ((as = counterCells) != null && (n = as.length) > 0) {
 
if ((a = as[(n - 1) & h]) == null) {
  
// 从counterCells随机一个CounterCell 
if (cellsBusy == 0) {
            // Try to attach new Cell 
CounterCell r = new CounterCell(x); 
if (cellsBusy == 0 && 
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 
// 对cellsBusy加锁 
boolean created = false; 
try {
             
// 如果找到的位置为null,将之前创建的新Cell放入数组 
CounterCell[] rs; int m, j; 
if ((rs = counterCells) != null && 
(m = rs.length) > 0 && 
rs[j = (m - 1) & h] == null) {
 
rs[j] = r;  
created = true; 
} 
} finally {
 
cellsBusy = 0; 
} 
if (created) 
break; 
continue;  
} 
} 
collide = false; 
} 
// 选出的CounterCell不为null 
else if (!wasUncontended) 
wasUncontended = true;      
// 对CounterCell尝试直接增加数值 
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) 
break; 
// 上面的更新都失败了,以下不是特别理解 
else if (counterCells != as || n >= NCPU) 
collide = false;     // 设置冲突标志位false,重新执行循环 
else if (!collide) 
collide = true; //  设置冲突标志,重新执行循环 
else if (cellsBusy == 0 &&  U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 // 加自旋锁 
try {
 
if (counterCells == as) {
 
// 扩容counterCells 为原来的两倍 
CounterCell[] rs = new CounterCell[n << 1]; 
//迁移数据 
for (int i = 0; i < n; ++i) 
rs[i] = as[i]; 
counterCells = rs;  
} 
} finally {
 
cellsBusy = 0; //解锁 
} 
collide = false; 
continue; 
} 
h = ThreadLocalRandom.advanceProbe(h); // 重新计算随机数 
} 
// CounterCell数组未初始化 
else if (cellsBusy == 0 && counterCells == as && 
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 
boolean init = false; 
try {
                           // Initialize table 
if (counterCells == as) {
 
// 初始容量为2 
CounterCell[] rs = new CounterCell[2]; 
rs[h & 1] = new CounterCell(x); 
counterCells = rs; 
init = true; 
} 
} finally {
 
cellsBusy = 0;//解锁 
} 
if (init) 
break; 
} 
// 如果cellsBusy 被占用,尝试更新baseCount 
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) 
break;                          
} 
} 

参考:
IT虾米网
IT虾米网
IT虾米网
IT虾米网

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/19395.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论