ConcurrentHashMap 原理和源码分析(二)详解编程语言

上一篇文章《ConcurrentHashMap 原理和源码分析(一)》 介绍了ConcurrentHashMap基础内部结构、还有原理;

本篇主要分析源代码,从几个主要的方法看ConcurrentHashMap内部是怎么实现的,以及ConcurrentHashMap的扩容机制和遍历机制。

其中上篇中介绍的一个重要的变量 sizeCtl 在这里会不断出现,为了方便理解,这里再把它拉过来下:

sizeCtl == 0 时候,默认情况
sizeCtl == -1 时候,说明table正在初始化
sizeCtl > 0 时候,说明接下来初始化要的初始化容量或者是扩容成功后threadshold的值
sizeCtl < 0 时候,说明正在扩容,高16位是数据校验位,低16位表示 n – 1个线程在参与扩容线程数

基本操作

构造函数和初始化

默认构造函数,什么事情都不做,可见ConcurrentHashMap是懒加载的模式,等到需要的时候再初始化;

public ConcurrentHashMap() {
    
} 

再看一个比较复杂的构造函数

// loadFactor 是加载因子,达到扩容的临界值 = 容量 * loadFactor  
// concurrencyLevel 表示支持多少个线程并发级别,支持多线程访问的单位就是Hash桶, 
// 所以 concurrencyLevel 等于最小的桶数量(最小容量) 
public ConcurrentHashMap(int initialCapacity, 
                         float loadFactor, int concurrencyLevel) {
    
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 
            throw new IllegalArgumentException(); 
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins 
            initialCapacity = concurrencyLevel;   // as estimated threads 
        // 这里跟hashMap的初始化一样,为了避免产生一次不必要的扩容 
        // 把initialCapacity当作临界值,重新算capacity 
        long size = (long)(1.0 + (long)initialCapacity / loadFactor); 
        int cap = (size >= (long)MAXIMUM_CAPACITY) ? 
            MAXIMUM_CAPACITY : tableSizeFor((int)size); 
        this.sizeCtl = cap; // sizeCtl表示准备扩容的大小 
} 

初始化 table

 private final Node<K,V>[] initTable() {
    
        Node<K,V>[] tab; int sc; 
        while ((tab = table) == null || tab.length == 0) {
    
            if ((sc = sizeCtl) < 0) // 说明正在扩容 
                Thread.yield(); // lost initialization race; just spin 
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    
            	 // sc = sizeCtl; // 等于要初始化的容量 
            	 // sizeCtl = -1; // CAS操作,设置为正在初始化 
                try {
    
                    if ((tab = table) == null || tab.length == 0) {
    
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 
                        @SuppressWarnings("unchecked") 
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 
                        table = tab = nt; 
                        // 初始化或者扩容成功后,sizeCtl变成threshold 
                        // n - n /4 = n * 0.75,实际上没用上loadFactor 
                        sc = n - (n >>> 2); 
                    } 
                } finally {
    
                    sizeCtl = sc; // 初始化完毕,将sizeCtl设置为threshold 
                } 
                break; 
            } 
        } 
        return tab; 
    } 

这里初始化操作中就使用CAS操作避免加锁的情况,在ConcurrentHashMap可以大量看到这种模式:

无限循环 + CAS

for ( ; ; ) { // 无限循环
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) //CAS操作
return next;
}

putVal

增加或者修改一个节点

// onlyIfAbsent表示如果存在这个节点,是否修改 
 final V putVal(K key, V value, boolean onlyIfAbsent) {
    
 	// 可见,ConcurrentHashMap不支持null的key或value 
        if (key == null || value == null) throw new NullPointerException(); 
        int hash = spread(key.hashCode()); 
        int binCount = 0;// 桶里面节点的数量,用来决定是否树化 
        for (Node<K,V>[] tab = table;;) {
   // 无限循环 + CAS 
            Node<K,V> f; int n, i, fh; 
            if (tab == null || (n = tab.length) == 0)// 若未初始化 
                tab = initTable();// 初始化table 
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
   // 若CAS找到的是空的hash桶,直接CAS添加节点 
                if (casTabAt(tab, i, null, 
                             new Node<K,V>(hash, key, value, null))) 
                    break; // 当hash桶为空,直接CAS添加,不用加锁 
            } 
            else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明table在扩容 
                tab = helpTransfer(tab, f);// 加入扩容 
            else {
   // hash桶不为空 
                V oldVal = null; 
                synchronized (f) {
   // 对hash桶的首节点加锁 
                    if (tabAt(tab, i) == f) {
    
                        if (fh >= 0) {
   // hash > 0 链表节点 
                            binCount = 1; 
                            for (Node<K,V> e = f;; ++binCount) {
    
                                K ek; 
                                if (e.hash == hash && 
                                    ((ek = e.key) == key || 
                                     (ek != null && key.equals(ek)))) {
   // 存在对应的key 
                                    oldVal = e.val; 
                                    if (!onlyIfAbsent) // 允许修改 
                                        e.val = value; 
                                    break; 
                                } 
                                Node<K,V> pred = e; 
                                if ((e = e.next) == null) {
   // 不存在对应的key 
                                    pred.next = new Node<K,V>(hash, key, 
                                                              value, null); 
                                    break; 
                                } 
                            } 
                        } 
                        else if (f instanceof TreeBin) {
   // 碰到红黑树首节点 
                            Node<K,V> p; 
                            binCount = 2; 
                            // 插入节点, 
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 
                                                           value)) != null) {
    
                                // 存在这个key 
                                oldVal = p.val; 
                                if (!onlyIfAbsent) 
                                    p.val = value; 
                            } 
                        } 
                        else if (f instanceof ReservationNode) 
                            throw new IllegalStateException("Recursive update"); 
                    } 
                } 
                if (binCount != 0) {
    
                    if (binCount >= TREEIFY_THRESHOLD)// 超过树化数量 8 
                        treeifyBin(tab, i);// 链表转化为红黑树 
                    if (oldVal != null) 
                        return oldVal; 
                    break; 
                } 
            } 
        } 
        addCount(1L, binCount);  // 新增一个节点数量 
        return null; 
    } 

其实添加节点的流程也很清晰了:

  1. 找到对应的hash桶
  2. hash桶为空,则CAS添加节点
  3. hash桶是ForwardingNode,加入帮助扩容
  4. hash桶不为空,锁住hash桶,根据链表、红黑树插入或修改节点
  5. 插入完成后,树化、增加节点数量

这个操作流程在修改、删除基本一样,在对节点进行操作的时候需要通过互斥锁保证线程安全,但只需要锁住对应的hash桶。

remove

删除一个节点

//删除节点 
public V remove(Object key) {
    
        return replaceNode(key, null, null); 
} 
 
// 替换节点 
// table[key].val等于期望值cv或cv等于null,更新为value 
// 如果value为null,删除该节点 
final V replaceNode(Object key, V value, Object cv) {
    
        int hash = spread(key.hashCode()); 
        for (Node<K,V>[] tab = table;;) {
    
            Node<K,V> f; int n, i, fh; 
            if (tab == null || (n = tab.length) == 0 || 
                (f = tabAt(tab, i = (n - 1) & hash)) == null) // 找到hash桶为null 
                break; 
            else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明table在扩容 
                tab = helpTransfer(tab, f);// 加入帮助扩容 
            else {
    
                V oldVal = null; 
                boolean validated = false; 
                synchronized (f) {
   // 锁住hash桶 
                    if (tabAt(tab, i) == f) {
    
                        if (fh >= 0) {
    
                            validated = true; 
                            for (Node<K,V> e = f, pred = null;;) {
    
                                K ek; 
                                if (e.hash == hash && 
                                    ((ek = e.key) == key || 
                                     (ek != null && key.equals(ek)))) {
    
                                    V ev = e.val; 
                                    if (cv == null || cv == ev || 
                                        (ev != null && cv.equals(ev))) {
   // 替换修改值 
                                        oldVal = ev; 
                                        if (value != null) 
                                            e.val = value; 
                                        else if (pred != null) 
                                            pred.next = e.next; 
                                        else 
                                            setTabAt(tab, i, e.next); 
                                    } 
                                    break; 
                                } 
                                pred = e; 
                                if ((e = e.next) == null) 
                                    break; 
                            } 
                        } 
                        else if (f instanceof TreeBin) {
   // 红黑树 
                            validated = true; 
                            TreeBin<K,V> t = (TreeBin<K,V>)f; 
                            TreeNode<K,V> r, p; 
                            if ((r = t.root) != null && 
                                (p = r.findTreeNode(hash, key, null)) != null) {
   // 同样替换修改 
                                V pv = p.val; 
                                if (cv == null || cv == pv || 
                                    (pv != null && cv.equals(pv))) {
    
                                    oldVal = pv; 
                                    if (value != null) 
                                        p.val = value; 
                                    else if (t.removeTreeNode(p)) 
                                        setTabAt(tab, i, untreeify(t.first)); 
                                } 
                            } 
                        } 
                        else if (f instanceof ReservationNode) 
                            throw new IllegalStateException("Recursive update"); 
                    } 
                } 
                if (validated) {
    
                    if (oldVal != null) {
    
                        if (value == null) 
                            addCount(-1L, -1);// 将节点数量-1 
                        return oldVal; 
                    } 
                    break; 
                } 
            } 
        } 
        return null; 
} 

上面replaceNode的流程跟putVal的流程基本上是一样:

  1. 找到对应的hash桶
  2. hash桶是ForwardingNode,加入帮助扩容
  3. hash桶不为空,锁住hash桶,根据链表、红黑树修改节点
  4. 增加节点数量

get

通过指定的key,获取value

    public V get(Object key) {
    
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 
        int h = spread(key.hashCode());// 计算hash值 
        if ((tab = table) != null && (n = tab.length) > 0 && 
            (e = tabAt(tab, (n - 1) & h)) != null) {
    
            if ((eh = e.hash) == h) {
   // e是hash桶首节点 
                if ((ek = e.key) == key || (ek != null && key.equals(ek))) 
                    return e.val;// 首节点是目标节点,直接返回 
            } 
            else if (eh < 0)// hash小于0,调用特殊节点的find() 
                return (p = e.find(h, key)) != null ? p.val : null; 
            while ((e = e.next) != null) {
   // 普通链表 
                if (e.hash == h && 
                    ((ek = e.key) == key || (ek != null && key.equals(ek)))) 
                    return e.val; 
            } 
        } 
        return null; 
    } 

get方法的流程也是很清晰的,整个流程都没有加互斥锁:

  1. 找到对应的hash桶;
  2. hash桶首节点是想找的目标节点,直接返回;
  3. hash桶首节点的hash小于0,说明是树的首节点或者是forwardindNode,直接调用红黑树的find(),或者forwardindNode的find();在上一篇文章有介绍过forwardindNode的find方法;
  4. hash桶首节点的hash大于0,说明是普通链表,递归查找。

扩容机制

扩容

ConcurrentHashMap支持多线程协助扩容,扩容的工作分成两部分:

  1. 生成一个新的table,即nextTable,容量为原来的2倍
  2. 将旧的table迁移到nextTable,这部分也是众多线程能分摊的工作

这里会产生一个疑问:多个线程参与迁移工作会不会相互干扰啊?答案是不会的。

任意一个节点,假设节点所在桶序号是2,扩容之后只有两种可能,一种是呆在原来的桶2,一种是迁移到序号是(2 + 旧容量)桶中。

线程从transferIndex中分配得到一个区间的hash桶迁移任务,比如说是旧的table的 [a, b) 区间,那么这个线程工作的桶就是 [a, b) 和 [ a+ 旧capacity, b+ 旧capacity),这样每个任务就独立了,很适合多线程工作。

示意图

tryPresize
 private final void tryPresize(int size) {
    
 		// 计算容量 
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : 
            tableSizeFor(size + (size >>> 1) + 1); 
        int sc; 
        while ((sc = sizeCtl) >= 0) {
    
        	// sizeCtl > 0,可能是threshold或准备扩容的容量 
            Node<K,V>[] tab = table; int n; 
            if (tab == null || (n = tab.length) == 0) {
    
            // table为空,需要进行初始化 
                n = (sc > c) ? sc : c;// 选择较大的容量 
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    
                //修改sizeCtl为-1,表示正在初始化 
                    try {
    
                        if (table == tab) {
    
                            @SuppressWarnings("unchecked") 
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 
                            table = nt; 
                            sc = n - (n >>> 2);// sc = n * 3/4; 
                        } 
                    } finally {
    
                        sizeCtl = sc;// 设置成theashold 
                    } 
                } 
            } 
            else if (c <= sc || n >= MAXIMUM_CAPACITY) 
                break; 
            else if (tab == table) {
   // 存在table,调用transfer扩容 
                int rs = resizeStamp(n); 
                if (U.compareAndSwapInt(this, SIZECTL, sc, 
                                        (rs << RESIZE_STAMP_SHIFT) + 2)) 
                    transfer(tab, null); 
            } 
        } 
    } 
transfer

线程在许多操作如put、replace、remove碰到ForwardingNode都会调用,具体来看下怎么扩容的

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    
        int n = tab.length, stride; 
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 
            stride = MIN_TRANSFER_STRIDE; // subdivide range 
        if (nextTab == null) {
    // 初始化nextTable           
            try {
    
                @SuppressWarnings("unchecked") 
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 
                nextTab = nt; 
            } catch (Throwable ex) {
         // try to cope with OOME 
                sizeCtl = Integer.MAX_VALUE; 
                return; 
            } 
            nextTable = nextTab; 
            transferIndex = n; 
           //transferIndex表示目前指向的最大的hash桶索引数, 
           // 它会每次分配出去stride 个数量的桶数给线程,慢慢的减少,直至为0,表示分配完成 
        } 
        int nextn = nextTab.length; 
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 
        boolean advance = true; 
        boolean finishing = false; // to ensure sweep before committing nextTab 
        for (int i = 0, bound = 0;;) {
    
            Node<K,V> f; int fh; 
            while (advance) {
    
            	// 分配负责的任务区间 
                int nextIndex, nextBound; 
                if (--i >= bound || finishing) 
                    advance = false; 
                else if ((nextIndex = transferIndex) <= 0) {
    
                    i = -1; 
                    advance = false; 
                } 
                else if (U.compareAndSwapInt 
                         (this, TRANSFERINDEX, nextIndex, 
                          nextBound = (nextIndex > stride ? 
                                       nextIndex - stride : 0))) {
    
                    bound = nextBound; 
                    i = nextIndex - 1; 
                    advance = false; 
                } 
            } 
            if (i < 0 || i >= n || i + n >= nextn) {
    
                int sc; 
                // 扩容任务结束,最后一个线程负责修改table,nextTable,sizeCtl 
                if (finishing) {
    
                    nextTable = null; 
                    table = nextTab; 
                    sizeCtl = (n << 1) - (n >>> 1); 
                    return; 
                } 
                // 当前线程负责的任务结束,sizeCtl - 1 
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    
                	 // resizeStamp(n) << RESIZE_STAMP_SHIFT) 获取一个校验戳,左移16位,那么低16位都位0 
                	 // sc = (高16位)校验戳 + (低16位)(线程数 + 1) 
                	 // 初始化时候 sc = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 
                	 // 所以只要在最后一个线程:sc - 1 - 1 == 校验戳 
                	 // 这里既判断是不是最后一个线程,又利用校验戳判断了是不是本次扩容任务  
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
                        return; 
                    finishing = advance = true; 
                    i = n; // recheck before commit 
                } 
            } 
            else if ((f = tabAt(tab, i)) == null)// hash桶为null,置一个ForwardingNode 
                advance = casTabAt(tab, i, null, fwd); 
            else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明处理过 
                advance = true;  
            else {
    
                synchronized (f) {
   // 锁住hash桶 
                    if (tabAt(tab, i) == f) {
    
                        Node<K,V> ln, hn; 
                        if (fh >= 0) {
   // hash>0,链表节点 
                            int runBit = fh & n; 
                            Node<K,V> lastRun = f; 
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
    
                                int b = p.hash & n; 
                                if (b != runBit) {
    
                                    runBit = b; 
                                    lastRun = p; 
                                } 
                            } 
                            // 这部分跟hashmap迁移一样 
                             // 根据hash & 原capacity 是否等于0, 切分成两个链表,然后分别放在新表的i和i+n位置 
                             // hash&n == 0 说明 hash的值小,处在原来的位置i 
                             // hash&n > 0 说明 hash值大,需要迁移到新的位置i+n 
                            if (runBit == 0) {
    
                                ln = lastRun; 
                                hn = null; 
                            } 
                            else {
    
                                hn = lastRun; 
                                ln = null; 
                            } 
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
    
                                int ph = p.hash; K pk = p.key; V pv = p.val; 
                                if ((ph & n) == 0) 
                                    ln = new Node<K,V>(ph, pk, pv, ln); 
                                else 
                                    hn = new Node<K,V>(ph, pk, pv, hn); 
                            } 
                            setTabAt(nextTab, i, ln); 
                            setTabAt(nextTab, i + n, hn); 
                            setTabAt(tab, i, fwd); 
                            advance = true;// 当前hash桶完成 
                        } 
                        else if (f instanceof TreeBin) {
   // 红黑树,不做具体分析 
                            TreeBin<K,V> t = (TreeBin<K,V>)f; 
                            TreeNode<K,V> lo = null, loTail = null; 
                            TreeNode<K,V> hi = null, hiTail = null; 
                            int lc = 0, hc = 0; 
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
    
                                int h = e.hash; 
                                TreeNode<K,V> p = new TreeNode<K,V> 
                                    (h, e.key, e.val, null, null); 
                                if ((h & n) == 0) {
    
                                    if ((p.prev = loTail) == null) 
                                        lo = p; 
                                    else 
                                        loTail.next = p; 
                                    loTail = p; 
                                    ++lc; 
                                } 
                                else {
    
                                    if ((p.prev = hiTail) == null) 
                                        hi = p; 
                                    else 
                                        hiTail.next = p; 
                                    hiTail = p; 
                                    ++hc; 
                                } 
                            } 
                            // 低于UNTREEIFY_THRESHOLD,由树转化为链表 
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 
                                (hc != 0) ? new TreeBin<K,V>(lo) : t; 
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 
                                (lc != 0) ? new TreeBin<K,V>(hi) : t; 
                            setTabAt(nextTab, i, ln); 
                            setTabAt(nextTab, i + n, hn); 
                            setTabAt(tab, i, fwd); 
                            advance = true;// 当前hash桶完成 
                        } 
                    } 
                } 
            } 
        } 
} 
  1. 如果nextTable为null,则生成nextTable,容量增长一倍
  2. 线程根据 transferIndex索引分配一个区间的任务
  3. 迁移过程中,将旧table中的节点用ForwaringNode代替,表示已经转移到nextTable中
  4. 再根据是链表、红黑树做迁移再根据是链表、红黑树做迁移
  5. 迁移完成后,最后一个退出的线程要负责收尾

计数统计

跟节点计数相关的有三个变量:baseCount,CounterCell,cellsBusy

cellsBusy是一个只有0和1两个状态的volatile整数,当做一个自旋锁,0代表无锁,1代表加锁

总体的原则就是:先尝试更新baseCount,失败再利用CounterCell

  1. 通过CAS尝试更新baseCount ,如果更新成功则完成,如果CAS更新失败会进入下一步;
  2. 线程通过随机数ThreadLocalRandom.getProbe() & (n-1) 计算出在counterCells数组的位置,如果不为null,则CAS尝试在couterCell上直接增加数量,如果失败会进入下一步;
  3. counterCells数组会进行扩容为原来的两倍,继续随机,继续添加;
  4. 最后,table节点的数量 = baseCount + counterCells每个cell记录下来的节点数量
// check > 0表示要检查是否扩容,一般是true 
private final void addCount(long x, int check) {
    
    CounterCell[] as; 
    long b, s; 
    // counterCells 不为null,或者 baseCount修改失败,说明存在并发 
    if ((as = counterCells) != null || 
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    
        CounterCell a; long v;  int m; 
        boolean uncontended = true; 
 
		// ThreadLocalRandom.getProbe() 获取一个随机数 
		// ThreadLocalRandom.getProbe()& m 获取一个小于等于m的随机数 
        if (as == null || (m = as.length - 1) < 0 || 
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||  
			// 比较a下面的值是否为a.value,是则更新为a.value+x,相当于是为了增加CounterCell的值 
            !(uncontended = 
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    
    		// 若因为上面的操作都失败了,则进入fulladdCount 
            fullAddCount(x, uncontended); 
            return; 
        } 
        if (check <= 1) 
            return; 
        s = sumCount(); // 计算总数 
    } 
	 
    if (check >= 0) {
    // 检查是否扩容 
        Node<K,V>[] tab, nt; int n, sc; 
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 
               (n = tab.length) < MAXIMUM_CAPACITY) {
    
            int rs = resizeStamp(n); // 根据一个基数生成扩容的戳 
            if (sc < 0) {
    // 正在扩容或者正在初始化 
			// (sc >>> RESIZE_STAMP_SHIFT) :sc右移16位获取校验戳,跟rs对比是否同一次扩容 
			//  sc == rs + 1:不理解? 
			//  (nt = nextTable) == null 是否扩容完成 
			//  transferIndex <= 0 transferIndex是从大到小分发的,小于0则给线程分发完了 
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 
                    transferIndex <= 0) 
                    break;	 
  				// 加入帮助扩容,把正在执行transfer任务的线程数加1 
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 
                    transfer(tab, nt); // 迁移 
            } 
    		// 开始新的扩容,rs +(线程数)+1 = rs+2 
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) 
                transfer(tab, null); // 迁移 
            s = sumCount(); 
        } 
    } 
private final void fullAddCount(long x, boolean wasUncontended) {
    
    int h; 
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
    // ThreadLocalRandom未初始化 
        ThreadLocalRandom.localInit();      //初始化 
        h = ThreadLocalRandom.getProbe(); 
        wasUncontended = true; 
    } 
    boolean collide = false; // 是否冲突 
    for (;;) {
    
        CounterCell[] as; CounterCell a; int n; long v; 
        if ((as = counterCells) != null && (n = as.length) > 0) {
    
            if ((a = as[(n - 1) & h]) == null) {
     
            // 从counterCells随机一个CounterCell 
                if (cellsBusy == 0) {
               // Try to attach new Cell 
                    CounterCell r = new CounterCell(x); 
                    if (cellsBusy == 0 && 
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    
 	 					// 对cellsBusy加锁 
                        boolean created = false; 
                        try {
                
						// 如果找到的位置为null,将之前创建的新Cell放入数组 
                            CounterCell[] rs; int m, j; 
                            if ((rs = counterCells) != null && 
                                (m = rs.length) > 0 && 
                                rs[j = (m - 1) & h] == null) {
    
                                rs[j] = r;  
                                created = true; 
                            } 
                        } finally {
    
                            cellsBusy = 0; 
                        } 
                        if (created) 
                            break; 
                        continue;  
                    } 
                } 
                collide = false; 
            } 
    	    // 选出的CounterCell不为null 
            else if (!wasUncontended) 
                wasUncontended = true;      
    	    // 对CounterCell尝试直接增加数值 
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) 
                break; 
			// 上面的更新都失败了,以下不是特别理解 
            else if (counterCells != as || n >= NCPU) 
                collide = false;     // 设置冲突标志位false,重新执行循环 
            else if (!collide) 
                collide = true; //  设置冲突标志,重新执行循环 
            else if (cellsBusy == 0 &&  U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    // 加自旋锁 
                try {
    
                    if (counterCells == as) {
    
                    		// 扩容counterCells 为原来的两倍 
                        CounterCell[] rs = new CounterCell[n << 1]; 
  						//迁移数据 
                        for (int i = 0; i < n; ++i) 
                            rs[i] = as[i]; 
                        counterCells = rs;  
                    } 
                } finally {
    
                    cellsBusy = 0; //解锁 
                } 
                collide = false; 
                continue; 
            } 
            h = ThreadLocalRandom.advanceProbe(h); // 重新计算随机数 
        } 
		// CounterCell数组未初始化 
        else if (cellsBusy == 0 && counterCells == as && 
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    
            boolean init = false; 
            try {
                              // Initialize table 
                if (counterCells == as) {
    
					// 初始容量为2 
                    CounterCell[] rs = new CounterCell[2]; 
                    rs[h & 1] = new CounterCell(x); 
                    counterCells = rs; 
                    init = true; 
                } 
            } finally {
    
                cellsBusy = 0;//解锁 
            } 
            if (init) 
                break; 
        } 
		// 如果cellsBusy 被占用,尝试更新baseCount 
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) 
            break;                          
    } 
} 

参考:
IT虾米网
IT虾米网
IT虾米网
IT虾米网

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/19395.html

(0)
上一篇 2021年7月19日 22:09
下一篇 2021年7月19日 22:09

相关推荐

发表回复

登录后才能评论