上一篇文章《ConcurrentHashMap 原理和源码分析(一)》 介绍了ConcurrentHashMap基础内部结构、还有原理;
本篇主要分析源代码,从几个主要的方法看ConcurrentHashMap内部是怎么实现的,以及ConcurrentHashMap的扩容机制和遍历机制。
其中上篇中介绍的一个重要的变量 sizeCtl 在这里会不断出现,为了方便理解,这里再把它拉过来下:
sizeCtl == 0 时候,默认情况
sizeCtl == -1 时候,说明table正在初始化
sizeCtl > 0 时候,说明接下来初始化要的初始化容量或者是扩容成功后threadshold的值
sizeCtl < 0 时候,说明正在扩容,高16位是数据校验位,低16位表示 n – 1个线程在参与扩容线程数
基本操作
构造函数和初始化
默认构造函数,什么事情都不做,可见ConcurrentHashMap是懒加载的模式,等到需要的时候再初始化;
public ConcurrentHashMap() {
}
再看一个比较复杂的构造函数
// loadFactor 是加载因子,达到扩容的临界值 = 容量 * loadFactor
// concurrencyLevel 表示支持多少个线程并发级别,支持多线程访问的单位就是Hash桶,
// 所以 concurrencyLevel 等于最小的桶数量(最小容量)
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// 这里跟hashMap的初始化一样,为了避免产生一次不必要的扩容
// 把initialCapacity当作临界值,重新算capacity
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap; // sizeCtl表示准备扩容的大小
}
初始化 table
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) // 说明正在扩容
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// sc = sizeCtl; // 等于要初始化的容量
// sizeCtl = -1; // CAS操作,设置为正在初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化或者扩容成功后,sizeCtl变成threshold
// n - n /4 = n * 0.75,实际上没用上loadFactor
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc; // 初始化完毕,将sizeCtl设置为threshold
}
break;
}
}
return tab;
}
这里初始化操作中就使用CAS操作避免加锁的情况,在ConcurrentHashMap可以大量看到这种模式:
无限循环 + CAS
for ( ; ; ) { // 无限循环
int current = get();
int next = current + 1;
if (compareAndSet(current, next)) //CAS操作
return next;
}
putVal
增加或者修改一个节点
// onlyIfAbsent表示如果存在这个节点,是否修改
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 可见,ConcurrentHashMap不支持null的key或value
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;// 桶里面节点的数量,用来决定是否树化
for (Node<K,V>[] tab = table;;) {
// 无限循环 + CAS
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)// 若未初始化
tab = initTable();// 初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 若CAS找到的是空的hash桶,直接CAS添加节点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // 当hash桶为空,直接CAS添加,不用加锁
}
else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明table在扩容
tab = helpTransfer(tab, f);// 加入扩容
else {
// hash桶不为空
V oldVal = null;
synchronized (f) {
// 对hash桶的首节点加锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// hash > 0 链表节点
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 存在对应的key
oldVal = e.val;
if (!onlyIfAbsent) // 允许修改
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 不存在对应的key
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 碰到红黑树首节点
Node<K,V> p;
binCount = 2;
// 插入节点,
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 存在这个key
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)// 超过树化数量 8
treeifyBin(tab, i);// 链表转化为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 新增一个节点数量
return null;
}
其实添加节点的流程也很清晰了:
- 找到对应的hash桶
- hash桶为空,则CAS添加节点
- hash桶是ForwardingNode,加入帮助扩容
- hash桶不为空,锁住hash桶,根据链表、红黑树插入或修改节点
- 插入完成后,树化、增加节点数量
这个操作流程在修改、删除基本一样,在对节点进行操作的时候需要通过互斥锁保证线程安全,但只需要锁住对应的hash桶。
remove
删除一个节点
//删除节点
public V remove(Object key) {
return replaceNode(key, null, null);
}
// 替换节点
// table[key].val等于期望值cv或cv等于null,更新为value
// 如果value为null,删除该节点
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null) // 找到hash桶为null
break;
else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明table在扩容
tab = helpTransfer(tab, f);// 加入帮助扩容
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
// 锁住hash桶
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
// 替换修改值
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
// 红黑树
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
// 同样替换修改
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);// 将节点数量-1
return oldVal;
}
break;
}
}
}
return null;
}
上面replaceNode的流程跟putVal的流程基本上是一样:
- 找到对应的hash桶
- hash桶是ForwardingNode,加入帮助扩容
- hash桶不为空,锁住hash桶,根据链表、红黑树修改节点
- 增加节点数量
get
通过指定的key,获取value
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());// 计算hash值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// e是hash桶首节点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;// 首节点是目标节点,直接返回
}
else if (eh < 0)// hash小于0,调用特殊节点的find()
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 普通链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法的流程也是很清晰的,整个流程都没有加互斥锁:
- 找到对应的hash桶;
- hash桶首节点是想找的目标节点,直接返回;
- hash桶首节点的hash小于0,说明是树的首节点或者是forwardindNode,直接调用红黑树的find(),或者forwardindNode的find();在上一篇文章有介绍过forwardindNode的find方法;
- hash桶首节点的hash大于0,说明是普通链表,递归查找。
扩容机制
ConcurrentHashMap支持多线程协助扩容,扩容的工作分成两部分:
- 生成一个新的table,即nextTable,容量为原来的2倍
- 将旧的table迁移到nextTable,这部分也是众多线程能分摊的工作
这里会产生一个疑问:多个线程参与迁移工作会不会相互干扰啊?答案是不会的。
任意一个节点,假设节点所在桶序号是2,扩容之后只有两种可能,一种是呆在原来的桶2,一种是迁移到序号是(2 + 旧容量)桶中。
线程从transferIndex中分配得到一个区间的hash桶迁移任务,比如说是旧的table的 [a, b) 区间,那么这个线程工作的桶就是 [a, b) 和 [ a+ 旧capacity, b+ 旧capacity),这样每个任务就独立了,很适合多线程工作。
tryPresize
private final void tryPresize(int size) {
// 计算容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
// sizeCtl > 0,可能是threshold或准备扩容的容量
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
// table为空,需要进行初始化
n = (sc > c) ? sc : c;// 选择较大的容量
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//修改sizeCtl为-1,表示正在初始化
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);// sc = n * 3/4;
}
} finally {
sizeCtl = sc;// 设置成theashold
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 存在table,调用transfer扩容
int rs = resizeStamp(n);
if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
transfer
线程在许多操作如put、replace、remove碰到ForwardingNode都会调用,具体来看下怎么扩容的
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) {
// 初始化nextTable
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
// try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
//transferIndex表示目前指向的最大的hash桶索引数,
// 它会每次分配出去stride 个数量的桶数给线程,慢慢的减少,直至为0,表示分配完成
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
// 分配负责的任务区间
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容任务结束,最后一个线程负责修改table,nextTable,sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 当前线程负责的任务结束,sizeCtl - 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// resizeStamp(n) << RESIZE_STAMP_SHIFT) 获取一个校验戳,左移16位,那么低16位都位0
// sc = (高16位)校验戳 + (低16位)(线程数 + 1)
// 初始化时候 sc = resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2
// 所以只要在最后一个线程:sc - 1 - 1 == 校验戳
// 这里既判断是不是最后一个线程,又利用校验戳判断了是不是本次扩容任务
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)// hash桶为null,置一个ForwardingNode
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)// 碰到ForwardingNode,说明处理过
advance = true;
else {
synchronized (f) {
// 锁住hash桶
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// hash>0,链表节点
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 这部分跟hashmap迁移一样
// 根据hash & 原capacity 是否等于0, 切分成两个链表,然后分别放在新表的i和i+n位置
// hash&n == 0 说明 hash的值小,处在原来的位置i
// hash&n > 0 说明 hash值大,需要迁移到新的位置i+n
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;// 当前hash桶完成
}
else if (f instanceof TreeBin) {
// 红黑树,不做具体分析
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 低于UNTREEIFY_THRESHOLD,由树转化为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;// 当前hash桶完成
}
}
}
}
}
}
- 如果nextTable为null,则生成nextTable,容量增长一倍
- 线程根据 transferIndex索引分配一个区间的任务
- 迁移过程中,将旧table中的节点用ForwaringNode代替,表示已经转移到nextTable中
- 再根据是链表、红黑树做迁移再根据是链表、红黑树做迁移
- 迁移完成后,最后一个退出的线程要负责收尾
计数统计
跟节点计数相关的有三个变量:baseCount,CounterCell,cellsBusy
cellsBusy是一个只有0和1两个状态的volatile整数,当做一个自旋锁,0代表无锁,1代表加锁
总体的原则就是:先尝试更新baseCount,失败再利用CounterCell
- 通过CAS尝试更新baseCount ,如果更新成功则完成,如果CAS更新失败会进入下一步;
- 线程通过随机数ThreadLocalRandom.getProbe() & (n-1) 计算出在counterCells数组的位置,如果不为null,则CAS尝试在couterCell上直接增加数量,如果失败会进入下一步;
- counterCells数组会进行扩容为原来的两倍,继续随机,继续添加;
- 最后,table节点的数量 = baseCount + counterCells每个cell记录下来的节点数量
// check > 0表示要检查是否扩容,一般是true
private final void addCount(long x, int check) {
CounterCell[] as;
long b, s;
// counterCells 不为null,或者 baseCount修改失败,说明存在并发
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// ThreadLocalRandom.getProbe() 获取一个随机数
// ThreadLocalRandom.getProbe()& m 获取一个小于等于m的随机数
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// 比较a下面的值是否为a.value,是则更新为a.value+x,相当于是为了增加CounterCell的值
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 若因为上面的操作都失败了,则进入fulladdCount
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount(); // 计算总数
}
if (check >= 0) {
// 检查是否扩容
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); // 根据一个基数生成扩容的戳
if (sc < 0) {
// 正在扩容或者正在初始化
// (sc >>> RESIZE_STAMP_SHIFT) :sc右移16位获取校验戳,跟rs对比是否同一次扩容
// sc == rs + 1:不理解?
// (nt = nextTable) == null 是否扩容完成
// transferIndex <= 0 transferIndex是从大到小分发的,小于0则给线程分发完了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 加入帮助扩容,把正在执行transfer任务的线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt); // 迁移
}
// 开始新的扩容,rs +(线程数)+1 = rs+2
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null); // 迁移
s = sumCount();
}
}
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
// ThreadLocalRandom未初始化
ThreadLocalRandom.localInit(); //初始化
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // 是否冲突
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
// 从counterCells随机一个CounterCell
if (cellsBusy == 0) {
// Try to attach new Cell
CounterCell r = new CounterCell(x);
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 对cellsBusy加锁
boolean created = false;
try {
// 如果找到的位置为null,将之前创建的新Cell放入数组
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
}
// 选出的CounterCell不为null
else if (!wasUncontended)
wasUncontended = true;
// 对CounterCell尝试直接增加数值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 上面的更新都失败了,以下不是特别理解
else if (counterCells != as || n >= NCPU)
collide = false; // 设置冲突标志位false,重新执行循环
else if (!collide)
collide = true; // 设置冲突标志,重新执行循环
else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 加自旋锁
try {
if (counterCells == as) {
// 扩容counterCells 为原来的两倍
CounterCell[] rs = new CounterCell[n << 1];
//迁移数据
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0; //解锁
}
collide = false;
continue;
}
h = ThreadLocalRandom.advanceProbe(h); // 重新计算随机数
}
// CounterCell数组未初始化
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
// Initialize table
if (counterCells == as) {
// 初始容量为2
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;//解锁
}
if (init)
break;
}
// 如果cellsBusy 被占用,尝试更新baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
}
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/19395.html