这是Java并发包最后一个集合框架的数据结构,其复杂程度也较以往任何数据结构复杂的多,顾名思义ConcurrentHashMap是线程安全版本的HashMap,总所周知HashMap是非线程安全的,若直接用于多线程并发环境将会出现很多问题,比如数据丢失,甚至某些操作陷入死循环导致CPU利用率100%等情况。除了ConcurrentHashMap能够保证线程安全之外,还可以通过两种方法获得线程安全的Map结构的实例,要么使用Collections.synchronizedMap(map)返回使用synchronized包装过的线程安全的map,要么直接使用Hashtable代替。这两种方法对于线程安全的支持本质上并没有太大区别,前者仅仅是在每一个synchronized方法内部直接使用传入的非线程安全的map实例本身,而后者hashtable也是在每一个有线程安全问题的方法上加synchronized锁,并采用和hashMap相同的实现原理重新实现。

关于JDK8以前的HashMap与ConcurrentHashMap

在JDK8以前的HashMap是通过数组 + 链表的数据结构实现的,即它首先维护一个数组table,数组中的每一个元素都是一个包含键值对以及一个next指针的Entry对象,通过对Key对象的hashCode做一系列运算的结果再与数组的长度取模得出一个映射到数组table的下标(即索引,或者称之为“槽”),从而将键值对映射到数组的不同槽位,然而通常具有相同hashCode值的不同Key很大程度上将会映射到数组的同一个槽位,这可以称之为“碰撞”,这时候为了能够存储这种键值,HashMap采用了以链表的方式来组织具有相同hash值的不同Key(具体来说,当插入发生碰撞时,新节点会重新占据数组的对应槽位,该槽位原来的Entry节点被挤下去成为新节点的后继节点,这就是所谓的“头插法”,即在链表的头部插入),这样一来,具有相同hash值的不同Key虽然对应到了数组中的同一个槽位,但是却位于链表上的不同节点上,下面是一个JDK8以前的HashMap的内部结构示意图:

Java同步数据结构之ConcurrentHashMap

JDK8以前的HashMap的这种设计和实现都很简单,但是当具体相同Hash值的Key较多的时候,链表的长度将会很长,导致查询效率极其低下,毕竟链表的查询只能从头部一个一个的往后遍历比较。 

HashMap内部有一个值为0.75的默认加载因子loadFactor,该加载因子也可通过构造函数传入指定的值,其作用就是当数组table的使用率超过75%时,对数组长度进行扩容,还有一个变量threshold则表示下一次触发扩容的临界数组长度,即threshold = table.length * loadFactor,而。每一次扩容之后数组的长度是原来的2倍。数组是一个创建后就长度不可变更的数据结构,要对数组进行扩容,只有创建新的数组,然后将旧数组中的元素一个个拷贝过去,扩容是一个比较耗时的过程,也是当HashMap运用到多线程并发环境下线程不安全的主要诱因,扩容过程中依然采用头插法组织链表元素,所以扩容之后链表中的节点顺序将会发生反转,并且由于将Key影射到不同数组槽位的时候需要与数组长度取模运算,当数组长度发生变化之后具有相同hash值的Key也有可能会被映射到不同的槽位,当多个线程同时进行扩容操作时,CPU时间片的分配与HashMap的扩容机制一结合,就产生了数据丢失甚至构成环形链表的可能。环形链就会造成某些操作陷入死循环导致CPU利用率100%等情况,关于HashMap线程不安全的详细分析过程可用参考JDK1.7和JDK1.8中HashMap为什么是线程不安全的?

ConcurrentHashMap

JDK8之前的ConcurrentHashMap是采用分段锁的方式实现了HashMap的线程安全版本,顾名思义数组table中的某一部分使用同一个锁保证线程安全,它内部定义了一个Segment数组,Segment继承了ReentrantLock,所以一个Segment就是一把锁,每一个Segment内部又持有一个table数组,这样就相当于将HashMap种的table数组拆分成若干个分段数组,每一个Segment管理table数组的一个区间,每一个table数组还是按照HashMap的实现方式实现即数组+链表,新节点的插入还是按头插法进入。这种方式是一种粗粒度的并发控制方案,当两个操作位于不同的两个段时可以不受线程安全的影响,但是位于同一个段的不同数组槽位的更新操作依然会受到并发控制的互斥访问限制,所以吞吐量并没有提高太多,但是任何读操作不存在竞争,即是读写分离的。下面的一个ConcurrentHashMap的内部结构示意图:

Java同步数据结构之ConcurrentHashMap

Segment数组的每一个Segment元素都对应一个table数组,同时也共享同一把互斥锁,table数组中的每一个元素都是一个HashEntry对象,HashEntry持有键值对,hash值以及指向链表结构的下一个节点next指针。

JDK8的HashMap

从JDK8开始,HashMap和ConcurrentHashMap的实现都做了大的调整,针对HashMap主要围绕解决长链表下查询缓慢的情况进行了改进,其主要变化就是将长链表换成了红黑树(一种平衡二叉树),因此JDK8的HashMap采用了数组+短链表+红黑树的数据结构实现,在链表的长度超过8个节点的时候,将会将链表通过旋转的方式直接转换成红黑树(称之为树化),红黑树的引入在查询效率上至少提升了2倍以上。以下是其内部结构示意图:

Java同步数据结构之ConcurrentHashMap

Java8HashMap的table数组元素是由一个个Node或TreeNode节点组成,对于红黑树对应的数组槽位中始终存储其根节点,对于链表结构,每一次新元素都在链表尾部插入,即“尾插法”;对于红黑树,每一次新插入节点可能都会引起红黑树的旋转从而导致结构变化,但其根节点始终存储在table数组的槽位中。

除了内部数据结构的变化,HashMap其它特性例如加载因子,扩容等都与JDK8以前的版本差不多,但JDK8的HashMap对以前版本扩容可能造成环形链的问题进行了修复,因此当再次用于多线程并发环境,JDK8的HashMap将不会导致CPU%100的情况,但依然可能存在数据覆盖的问题出现,因此依然不是线程安全的。多线程环境下依然需要使用ConcurrentHashMap,它的JDK8版本下也做了大的调整。

JDK8的ConcurrentHashMap

分析它才是本文的最终目的,首先JDK8的ConcurrentHashMap内部数据结构基本与JDK8的HashMap一致,也是基于数组 + 短链表 + 红黑树的方式设计实现的。因为JDK8以前的分段锁思想是一种粗粒度的线程安全实现,而JDK8的ConcurrentHashMap则将分段锁的概念细划到单个的数组槽位上,即一个table数组槽位一个锁,因此只有更新操作具有相同hash值得线程之间才会存在竞争,任何读取操作依然不涉及竞争问题,仍然是读写分离的。JDK8抛弃分段锁不但节省了不必要的空间消耗,而且用回了传统的synchronized关键字的重量级锁,毕竟现在的JDK对其优化已经比较好了。

ConcurrentHashMap中table数组在存储红黑树的根节点时,使用了一个TreeBin的类封装TreeNode,因此不再像JDK8那样直接在数组的槽位中存放红黑树的根节点,而是一个携带根节点的TreeBin实例。另外,该类为了保证与以前的版本兼容,保留了拥有加载因子loadFactor和concurrencyLevel参数的构造函数,以及用于兼容序列化的Segment类,继承AbstractMap抽象类也是对兼容性的支持,除此之外并无其它目的。在构造函数传入的加载因子仅仅只是用于针对初始化给定容量的内部数组从而满足可以在放入给定数量的元素之前不触发扩容操作。其后,内部的加载因子还是默认的0.75,因为扩容操作是一个开销很大的过程,因此若能够在创建Map实例的时候确定大概需要的空间,将减少甚至消除扩容造成的开销。

ConcurrentHashMap在JDK8中对扩容操作进行了精妙的设计实现,任何读写线程在发现需要扩容或正在扩容时都会奉献雷锋精神,加入到辅助扩容的行列中,毕竟人多力量大,从而缩短扩容过程的时间开销,而且其内部的巧妙设计会在扩容过程中尽可能少的拷贝节点,根据Java Doc的描述,当table数组长度扩张一倍,只有大约六分之一的元素需要克隆。对于ConcurrentHashMap的理解我感觉比较有难度,特别是红黑树的转换脑袋感觉都不够用了,只能做粗略的分析。

 ConcurrentHashMap对size()方法也进行了精心的设计,它采用了类似高并发统计工具LongAdder的原理,使用baseCount + CounterCell数组的形式解决高并发更新同一个变量的线程争用问题,对产生竞争的线程将计数分散到哈希数组中的不同单元格中,而不需要在调用size方法时才遍历统计。关于LongAdder的原理可用查阅高并发原子累加器Striped64及其实现类LongAdder&LongAccumulator

ConcurrentHashMap中定义了一些特殊的节点,例如链表节点(hash值 > 0 ),红黑树节点(hash值为-1),转移节点ForwardingNodes(hash值为-1)标记该数组槽位已经被迁移到扩容后的table数组中,舜态节点ReservationNodes(hash值为-3)标记是一种用于lumdba表达式计算的临时节点,记住这些定义对于源码理解事半功倍。

通过构造函数,可见不仅没有持有加载引子,也没有持有threshold扩容阈值了,多了一个sizeCtl成员变量,该变量不同的值代表不同的含义: 默认值0表示数组table还未初始化,-1表示正在初始化table数组,-(1+n)表示有n个线程正在辅助一起扩容,> 0表示初始化数组完成,并且表示下一次触发扩容的数组占用阈值,例如现在数组长度是128,则sizeCtl 就是96,刚好是0.75的加载因子。sizeCtl还保存有扩容标记,确保调整大小不会重复执行。

ConcurrentHashMap扩容支持最多MAX_RESIZERS个线程并行进行以缩短时间,并且每一个扩容线程都按一个步长(默认是MIN_TRANSFER_STRIDE,即16)从数组末尾往头部分配一段还没被扩容的多个槽位,而不是一个线程一个槽位, transferIndex就用于指示现在扩容线程已经占据从数组末尾往头部的第几个槽位了,后来加入的扩容线程只能从该位置往前分配一段未完成扩容的槽位进行。

部分源码分析

Java同步数据结构之ConcurrentHashMap
  1  ------------------------常量---------------------------
  2  
  3  //最大的表容量。
  4  private static final int MAXIMUM_CAPACITY = 1 << 30;
  5  
  6  //默认初始表容量。
  7  private static final int DEFAULT_CAPACITY = 16;
  8  
  9  //最大的数组大小(非2次幂)。被toArray和toArray(T[] a)方法需要。
 10  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 11  
 12  //默认并发级别。未使用,仅仅用于与该类的以前版本兼容而定义。
 13  private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 14  
 15  //负载因子,貌似也没使用,在构造函数中指定此值只影响初始表容量。
 16  private static final float LOAD_FACTOR = 0.75f;
 17  
 18  //链表转换成红黑树(树化)的size阈值
 19  static final int TREEIFY_THRESHOLD = 8;
 20  
 21  //红黑色转换成链表(反树化)的size阈值,用在调整大小的时候。
 22  static final int UNTREEIFY_THRESHOLD = 6;
 23  
 24  //最小的表容量,该值至少是 4倍 TREEIFY_THRESHOLD以避免调整大小和树化阈值的冲突
 25  static final int MIN_TREEIFY_CAPACITY = 64;
 26  
 27  //扩容线程的最小跨度,即每一个线程至少分配16个槽位进行扩容
 28  private static final int MIN_TRANSFER_STRIDE = 16;
 29  
 30  //用于在sizeCtl中生成扩容标记的比特位数
 31  private static int RESIZE_STAMP_BITS = 16;
 32 
 33  //用于辅助调整大小的最大线程数
 34  private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 35  
 36  //在sizeCtl中标记位的偏移
 37  private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
 38  
 39  /*
 40  * Encodings for Node hash fields.节点哈希字段的编码
 41  */
 42  static final int MOVED     = -1; // 转发节点的哈希值
 43  static final int TREEBIN   = -2; // 红黑树的根节点的哈希值
 44  static final int RESERVED  = -3; // 临时保留节点的哈希
 45  static final int HASH_BITS = 0x7fffffff; // 普通链表节点哈希的可用位
 46  
 47  /* ---------------- 节点 -------------- */
 48 
 49  //键值对的条目,只用于遍历时自读操作,存在hash值为负值和键值为null的特殊节点。
 50  static class Node<K,V> implements Map.Entry<K,V> {
 51         final int hash;  //哈希值
 52         final K key;     //
 53         volatile V val;    //
 54         volatile Node<K,V> next; //下一个节点指针
 55  
 56  .....
 57  }
 58  
 59  //红黑树节点
 60  static final class TreeNode<K,V> extends Node<K,V> {
 61         TreeNode<K,V> parent;  // red-black tree links
 62         TreeNode<K,V> left;
 63         TreeNode<K,V> right;
 64         TreeNode<K,V> prev;    // needed to unlink next upon deletion
 65         boolean red;
 66  
 67   .....
 68  }
 69  
 70  //存放在tale数组槽位种的红黑树根节点包装类
 71  static final class TreeBin<K,V> extends Node<K,V> {
 72         TreeNode<K,V> root;
 73         volatile TreeNode<K,V> first;
 74         volatile Thread waiter;
 75         volatile int lockState;
 76         // values for lockState
 77         static final int WRITER = 1; // set while holding write lock
 78         static final int WAITER = 2; // set when waiting for write lock
 79         static final int READER = 4; // increment value for setting read lock
 80  
 81    .....
 82  }
 83  
 84  //标记已经被扩容转移的槽位,持有新数组的引用,可通过它的find方法让get操作可用从该节点过渡到新数组中去搜索
 85  static final class ForwardingNode<K,V> extends Node<K,V> {
 86         final Node<K,V>[] nextTable;
 87         ForwardingNode(Node<K,V>[] tab) {
 88             super(MOVED, null, null, null);
 89             this.nextTable = tab;
 90         }
 91         
 92       .....
 93  }    
 94  /* ---------------- 静态工具 -------------- */
 95  
 96  //通过哈希值h计算对应的数组索引
 97  static final int spread(int h) {
 98     return (h ^ (h >>> 16)) & HASH_BITS;
 99  }
100  //若x实现了Comparable接口则返回其类对象,否则返回null
101  static Class<?> comparableClassFor(Object x) 
102  
103  //若x与kc类型匹配,返回k.compareTo(x) ,否则返回0
104  static int compareComparables(Class<?> kc, Object k, Object x) {
105  
106  /* ---------------- table元素访问 -------------- */
107 
108  //volatile读取指定数组索引位置的元素
109  static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
110     return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
111  }
112  
113  //CAS更新指定数组索引位置的元素
114  static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
115                                     Node<K,V> c, Node<K,V> v) {
116     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
117  }
118  
119  //volatile设置指定数组索引位置的元素
120  static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
121     U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
122  }
123  
124  
125  /* ---------------- 字段 -------------- */
126  
127  //table数组,在第一次插入元素时才初始化,大小总是2的幂。由迭代器直接访问。
128  transient volatile Node<K,V>[] table;
129  
130  //调整大小时才不为空的临时新数组。
131  private transient volatile Node<K,V>[] nextTable;
132  
133  //基础计数器,主要用于没有竞争时,也可用用作初始化表产生竞争的回退,通过CAS更新
134  private transient volatile long baseCount;
135  
136  //用于表初始化和调整大小的控制,当为负值表正在初始化或者调整大小:-1 表示正在初始化,-(1 + 辅助调整大小的线程数)表示正在调整大小。
137  //当table等于null时该值等于用于初始创建表时的初始大小,默认情况为0.初始化完成之后,保留下一个元素的count值,用于调整大小。
138  private transient volatile int sizeCtl;
139  
140  //指示扩容时已经处理到的槽位位置(从数组末尾往前处理,所以最开始transferIndex等于数组的长度)
141  private transient volatile int transferIndex;
142  
143  //自旋锁,用于计数元素个数对CounterCells槽位的锁定或扩容
144  private transient volatile int cellsBusy;
145  
146  //用于计数元素个数的单元格计数器,非空时大小为2的幂
147  private transient volatile CounterCell[] counterCells;
View Code

相关文章:

  • 2022-12-23
  • 2022-01-14
  • 2021-12-26
  • 2022-12-23
  • 2022-12-23
  • 2021-11-10
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-12-12
  • 2021-08-14
  • 2022-01-11
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案