一、ConcurrentHashMap出现的原因
我们之前学过HashMap,也知道HashMap不是线程安全的,在多线程环境下,HashMap的put方法有可能引起死循环。于是HashTable这个类出现,它在大量的方法前都加了内置锁Synchronized,这就保证了它的线程安全性,但是这种方法太极端,导致效率低下。当一个线程访问了HashTable的同步方法时,其它线程就只能等待该线程释放锁。在这种情况,针对多线程的情况,ConcurrentHashMap应运而生。
二、ConcurrentHashMap的数据结构
ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问
(图片来自:http://www.cnblogs.com/dolphin0520/p/3920373.html)
和HashMap不同之处是,HashMap是使用一个数组来连接各个Entry链,而ConcurrentHashMap则是使用了Segment数组(继承ReentrantLock)来链接各个HashEntry数组,然后各个HashEntry数组中连接各自的HashEntry链。(这个要很注意)每一个Segment都有一个锁,所以这可以达到并发得访问各个Segment的Entry。
三、ConcurrentHashMap的定义
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
ConcurrentHashMap继承自AbstractMap,实现了ConcurrentMap接口,使得它具有Map的属性,同时又有多线程相关的属性
ConcurrentHashMap的成员变量:
//初始的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //初始的加载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //初始的并发等级(下面会叙述作用) static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //最小的segment数量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment数量 static final int MAX_SEGMENTS = 1 << 16; // static final int RETRIES_BEFORE_LOCK = 2;
四、ConcurrentHashMap的构造函数
//通过指定的容量,加载因子和并发等级创建一个新的ConcurrentHashMap public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //对容量,加载因子和并发等级做限制 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //限制并发等级不可以大于最大等级 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 下面即通过并发等级来确定Segment的大小 //sshift用来记录向左按位移动的次数 int sshift = 0; //ssize用来记录Segment数组的大小 int ssize = 1; //Segment的大小为大于等于concurrencyLevel的第一个2的n次方的数 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; //segmentMask的值等于ssize - 1(这个值很重要) this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //c记录每个Segment上要放置多少个元素 int c = initialCapacity / ssize; //假如有余数,则Segment数量加1 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; //创建第一个Segment,并放入Segment[]数组中,作为第一个Segment Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
对于容量和加载因子:我在HashMap那篇文章已经讲解得很清楚: Java容器(四):HashMap(Java 7)的实现原理
并发等级(concurrencyLevel):用来确定Segment的数量,Segment的个数为大于等于concurrencyLevel的 第一个 2的n次方的数,例如当concurrencyLevel为12,13,14,15,16时,此时的Segment的数量为16
segmentMask:这个为什么要为Segment数组的长度 -1, 这个也在HashMap中讲过了,主要是为了让低位为1,这样在做&运算确定Segment的索引时能够更加分散
五、ConcurrentHashMap的put操作
public V put(K key, V value) { Segment<K,V> s; //ConcurrentHashMap的key和value都不能为null if (value == null) throw new NullPointerException(); //这里对key求hash值,并确定应该放到segment数组的索引位置 int hash = hash(key); //j为索引位置,思路和HashMap的思路一样,这里不再多说 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); //这里很关键,找到了对应的Segment,则把元素放到Segment中去 return s.put(key, hash, value, false); }
得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:
0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。
其实大体和HashMap相似
下面看看具体如何插入到Segment中的
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //这里是并发的关键,每一个Segment进行put时,都会加锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { //tab是当前segment所连接的HashEntry数组 HashEntry<K,V>[] tab = table; //确定key的hash值所在HashEntry数组的索引位置 int index = (tab.length - 1) & hash; //取得要放入的HashEntry链的链头 HashEntry<K,V> first = entryAt(tab, index); //遍历当前HashEntry链 for (HashEntry<K,V> e = first;;) { //如果链头不为null if (e != null) { K k; //如果在该链中找到相同的key,则用新值替换旧值,并退出循环 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else e = e.next; } else {//如果没有找到key相同的,则把当前Entry插入到链头 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); //此时数量+1 int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) //如果超出了限制,要进行扩容 rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { //最后释放锁 unlock(); } return oldValue; }
我们来重新理一理思路:
1. 首先对key进行第1次hash,通过hash值确定segment的位置
2. 然后在segment内进行操作,获取锁
3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
4. 然后对当前索引的HashEntry链进行遍历,如果有重复的key,则替换;如果没有重复的,则插入到链头
5. 关闭锁
可见,在整个put过程中,进行了2次hash操作,才最终确定key的位置。
五、ConcurrentHashMap的remove操作
public V remove(Object key) { //求key的hash int hash = hash(key); //求得hash对应的Segment Segment<K,V> s = segmentForHash(hash); //在segment内进行删除 return s == null ? null : s.remove(key, hash, null); }
final V remove(Object key, int hash, Object value) { //获取锁 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { //tab是当前segment所连接的HashEntry数组 HashEntry<K,V>[] tab = table; //确定key的hash值所在HashEntry数组的索引位置 int index = (tab.length - 1) & hash; //取得要放入的HashEntry链的链头 HashEntry<K,V> e = entryAt(tab, index); //pred用来记录待删除节点的前一个节点 HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; //当找到了待删除及节点的位置 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { //如果待删除节点的前节点为null,即待删除节点时链头节点,此时把该位置指向第2个结点就行了 if (pred == null) setEntryAt(tab, index, next); //如果有前节点,则待删除节点的前节点的next指向待删除节点的的下一个节点,删除成功 else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { //最后关闭锁 unlock(); } return oldValue; }
我们来理一理思路:
1. 首先对key进行第1次hash,通过hash值确定segment的位置
2. 然后在segment内进行操作,获取锁
3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
4. 用一个HashEntry引用来记录待删除节点的前一个节点,然后删除待删除节点
5. 关闭锁
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/7789.html