一、HashTable
hashTable是一个线程安全的容器,是线程安全版本的HashMap。但它的底层是和HashMap一样的,只是在方法上都加上了synchronized关键字。
这样子有什么后果呢:
- 效率及低,意味着每个线程在执行HashTable的方法的时候,或者说操纵HashTable的时候,都要锁住整个对象。也就是让并行并发的访问,变成了串行。
- 复合操作会有线程安全问题。因为它是每个方法都加锁了,这意味着在执行单个方法像put,contains方法的时候,是可以保证原子性的,但如果是执行一个复合操作的时候,就不保证了。
if(!table.contains("key")) { map.put("key", object); }
类似于这样的方法,当线程1在执行if里面的判断的时候,线程1会获得table实例的所,其他线程无法访问table的其他同步方法。但当线程1判断完if后,锁会放掉,这个时候如果线程2进来,获得table实例的锁,然后put了一个”key“进来,然后再放锁;那么线程1再执行put方法就不对了。(它本来是以为没有这个key再put的)
二、concurrentHashMap1.7
并发思路
concurrenthashMap是采用一个叫做分段所的机制。
它可以看作是一个二重hashMap,首先concurrentHashMap是一个segment数组,每个segment都是一个继承了ReentrantLock的类,这样就可以方便地在各个segment里面加锁所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。
哦哦还要注意,这个最外面的Segment[]数组,是不可以扩容的!
然后进到Segment内部,会发现,每个Segment可以看作一个HashMap。也就是在一个Segment里面,有个HashEntry[]数组,然后这个数组是一个个桶,桶里面是单向链表。
(图片来自:http://www.importnew.com/28263.html)
构造函数
然后我们通过构造函数进入,顺便了解ConcurrentHashMap中重要的field吧。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4 // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // initialCapacity 是设置整个 map 初始的大小, // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小 // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上, // 插入一个元素不至于扩容,插入第二个的时候才会扩容 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 创建 Segment 数组, // 并创建数组的第一个元素 segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 往数组写入 segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
initialCapacity和以前一样,指的是这个ConcurrenthashMap的初始容量,或者说是理解成初始桶的数量。但我们这个hashmap是有两重表的嘛,所以在实际操作的时候会把这个值分配给各个Segment,也就相当于间接指定了每个Segment中应该有几个桶。
loadFactor和一般的hashTable一样,负载因子,size/capacity。但上面说了Segment数组是不可以扩容的,所以这个也是给Segment里面的数组用的。
concurrencyLevel:concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
segmentShift:
这个值=32 - shift,shift是你>=你传进来的concurrentLevel的一个2次幂数的左移位数。而二次幂的数字,都是10000这样的嘛,所以shift就是10000中0的个数。
所以field segmentShift我觉得可以理解成000000100000(32位数字),然后是前面的0加上1的位数就是segmentShift吧。
SegmetnMask:
掩码嘛,就二次幂处理后的concurrentLevel的长度 - 1,得到的就类似0111111这样咯,所以等等用来做与操作用的。
然后最后那个Unsafe的putOrderObject一个不安全的直接操纵内存的方法,应该是因为这样会快点吧。这个order应该是防止指令重排序的意思。
要了解Unsafe可以看这篇文章:https://www.cnblogs.com/throwable/p/9139947.html
如果我们就当是用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:
Segment 数组长度为 16,不可以扩容
Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码,这两个值马上就会用到
put方法
然后来看重要的put方法。
先看put的主流程:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 计算 key 的 hash 值 int hash = hash(key); // 2. 根据 hash 值找到 Segment 数组中的位置 j // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位, // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标 int j = (hash >>> segmentShift) & segmentMask; // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null, // ensureSegment(j) 对 segment[j] 进行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 3. 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
这里的主流程是在第一层表格的操作。就根据key的hash值,找到Segment[]数组的桶序号,然后先初始化这个segment[j](构造器中只初始化了segment[0]),然后进入这个segmetn[j],交给这个segment[j](局部HashMap)继续执行put操作。
求j的时候,hash值移了segmentShift后,刚好只剩后面四位(默认情况的话),刚好等于segmentMask15(4位)的位数,然后再相与就得到一个序号咯。
然后就通过s.put(key, hash, value, false);进入Segment内部的那个局部Hashmap的put方法。、
先看看这个初始化segment[j]的方法。
ensureSegment(j):
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 这里看到为什么之前要初始化 segment[0] 了, // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k] // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了 Segment<K,V> proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 内部的数组 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次检查一遍该槽是否被其他线程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
这里就用构造其中已经初始化好了的segment[0](也可能已经有元素了)的数据来构造segment[j]咯,然后再用自旋的CAS操作来更新segment数组中的j桶,更新成功或者是有别的线程更新成功都会跳出循环。
再来看segment里面的局部HashMap的put方法。
Segment里面的hashMap的put方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 在往该 segment 写入前,需要先获取该 segment 的独占锁 // 先看主流程,后面还会具体介绍这部分内容 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 这个是 segment 内部的数组 HashEntry<K,V>[] tab = table; // 再利用 hash 值,求应该放置的数组下标 int index = (tab.length - 1) & hash; // first 是数组该位置处的链表的表头 HashEntry<K,V> first = entryAt(tab, index); // 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { // 覆盖旧值 e.value = value; ++modCount; } break; } // 继续顺着链表走 e = e.next; } else { // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。 // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 如果超过了该 segment 的阈值,这个 segment 需要扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 扩容后面也会具体分析 else // 没有达到阈值,将 node 放到数组 tab 的 index 位置, // 其实就是将新的节点设置成原链表的表头 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解锁 unlock(); } return oldValue; }