一、HashTable

hashTable是一个线程安全的容器,是线程安全版本的HashMap。但它的底层是和HashMap一样的,只是在方法上都加上了synchronized关键字。

这样子有什么后果呢:

  1. 效率及低,意味着每个线程在执行HashTable的方法的时候,或者说操纵HashTable的时候,都要锁住整个对象。也就是让并行并发的访问,变成了串行。
  2. 复合操作会有线程安全问题。因为它是每个方法都加锁了,这意味着在执行单个方法像put,contains方法的时候,是可以保证原子性的,但如果是执行一个复合操作的时候,就不保证了。
if(!table.contains("key")) {
    map.put("key", object);
}

类似于这样的方法,当线程1在执行if里面的判断的时候,线程1会获得table实例的所,其他线程无法访问table的其他同步方法。但当线程1判断完if后,锁会放掉,这个时候如果线程2进来,获得table实例的锁,然后put了一个”key“进来,然后再放锁;那么线程1再执行put方法就不对了。(它本来是以为没有这个key再put的)

 

 

二、concurrentHashMap1.7

并发思路

concurrenthashMap是采用一个叫做分段所的机制。

它可以看作是一个二重hashMap,首先concurrentHashMap是一个segment数组,每个segment都是一个继承了ReentrantLock的类,这样就可以方便地在各个segment里面加锁所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

哦哦还要注意,这个最外面的Segment[]数组,是不可以扩容的!

然后进到Segment内部,会发现,每个Segment可以看作一个HashMap。也就是在一个Segment里面,有个HashEntry[]数组,然后这个数组是一个个桶,桶里面是单向链表。

(图片来自:http://www.importnew.com/28263.html)

简单总结ConcurrentHashMap

 

 

 

构造函数

然后我们通过构造函数进入,顺便了解ConcurrentHashMap中重要的field吧。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4
    // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
 
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
 
    // initialCapacity 是设置整个 map 初始的大小,
    // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
    // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
    // 插入一个元素不至于扩容,插入第二个的时候才会扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c)
        cap <<= 1;
 
    // 创建 Segment 数组,
    // 并创建数组的第一个元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往数组写入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

 

 

initialCapacity和以前一样,指的是这个ConcurrenthashMap的初始容量,或者说是理解成初始桶的数量。但我们这个hashmap是有两重表的嘛,所以在实际操作的时候会把这个值分配给各个Segment,也就相当于间接指定了每个Segment中应该有几个桶。

 

loadFactor和一般的hashTable一样,负载因子,size/capacity。但上面说了Segment数组是不可以扩容的,所以这个也是给Segment里面的数组用的。

 

concurrencyLevel:concurrencyLevel:并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。

 

segmentShift:

这个值=32 - shift,shift是你>=你传进来的concurrentLevel的一个2次幂数的左移位数。而二次幂的数字,都是10000这样的嘛,所以shift就是10000中0的个数。

所以field segmentShift我觉得可以理解成000000100000(32位数字),然后是前面的0加上1的位数就是segmentShift吧。

 

SegmetnMask:

掩码嘛,就二次幂处理后的concurrentLevel的长度 - 1,得到的就类似0111111这样咯,所以等等用来做与操作用的。

 

然后最后那个Unsafe的putOrderObject一个不安全的直接操纵内存的方法,应该是因为这样会快点吧。这个order应该是防止指令重排序的意思。

要了解Unsafe可以看这篇文章:https://www.cnblogs.com/throwable/p/9139947.html

 

 

如果我们就当是用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:

Segment 数组长度为 16,不可以扩容
Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码,这两个值马上就会用到

 

 

 

put方法

然后来看重要的put方法。

先看put的主流程:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 计算 key 的 hash 值
    int hash = hash(key);
    // 2. 根据 hash 值找到 Segment 数组中的位置 j
    //    hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
    //    然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
    // ensureSegment(j) 对 segment[j] 进行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

 

这里的主流程是在第一层表格的操作。就根据key的hash值,找到Segment[]数组的桶序号,然后先初始化这个segment[j](构造器中只初始化了segment[0]),然后进入这个segmetn[j],交给这个segment[j](局部HashMap)继续执行put操作。

求j的时候,hash值移了segmentShift后,刚好只剩后面四位(默认情况的话),刚好等于segmentMask15(4位)的位数,然后再相与就得到一个序号咯。

 

然后就通过s.put(key, hash, value, false);进入Segment内部的那个局部Hashmap的put方法。、

 

先看看这个初始化segment[j]的方法。

 

ensureSegment(j):

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 这里看到为什么之前要初始化 segment[0] 了,
        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
        // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
 
        // 初始化 segment[k] 内部的数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // 再次检查一遍该槽是否被其他线程初始化了。
 
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

 

这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。

这里就用构造其中已经初始化好了的segment[0](也可能已经有元素了)的数据来构造segment[j]咯,然后再用自旋的CAS操作来更新segment数组中的j桶,更新成功或者是有别的线程更新成功都会跳出循环。

 

 

再来看segment里面的局部HashMap的put方法。

Segment里面的hashMap的put方法:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往该 segment 写入前,需要先获取该 segment 的独占锁
    //    先看主流程,后面还会具体介绍这部分内容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 这个是 segment 内部的数组
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求应该放置的数组下标
        int index = (tab.length - 1) & hash;
        // first 是数组该位置处的链表的表头
        HashEntry<K,V> first = entryAt(tab, index);
 
        // 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 继续顺着链表走
                e = e.next;
            }
            else {
                // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超过了该 segment 的阈值,这个 segment 需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 扩容后面也会具体分析
                else
                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}
View Code

相关文章:

  • 2021-06-22
  • 2021-11-25
  • 2022-02-16
  • 2022-01-19
  • 2021-12-29
  • 2021-06-29
猜你喜欢
  • 2021-11-27
  • 2021-06-25
  • 2021-08-09
相关资源
相似解决方案