1、不安全:大家都知道HashMap不是线程安全的,在多线程环境下,对HashMap进行put操作会导致死循环。是因为多线程会导致Entry链表形成环形数据结构,这样Entry的next节点将永远不为空,就会产生死循环获取Entry。具体内容见HashMap随笔。
2、不高效:Collections.synchronizedMap(hashMap)和HashTable的线程安全原理都是对方法进行同步,所有操作竞争同一把锁,性能比较低。
如何构造一个线程安全且高效的HashMap?ConcurrentHashMap登场。
锁分段技术
ConcurrentHashMap将数据分为很多段(Segment),Segment继承了ReentrantLock,每个段都是一把锁。每个Segment都包含一个HashEntry数组,HashEntry数组存放键值对数据。当一个线程要访问Entry数组时,需要获取所在Segment锁,保证在同一个Segment的操作是线程安全的,但其他Segment的数据的访问不受影响,可以实现并发的访问不同的Segment。同一个段中才存在竞争关系,不同的段之间没有竞争关系。
ConcurrentHashMap源码分析
源码分析基于jdk1.7,不同版本实现有所不同。
类图
初始化
segmentShift和segmentMask的作用是定位Segment索引。以默认值为例,concurrencyLevel为16,需要移位4次(sshift为4),segmentShift就等于28,segmentMask等于15。
concurrencyLevel是指并发级别,即Segment数组的大小。concurrencyLevel值得设定应该根据并发线程数决定。如果并发级别设置的太小,同一个Segment的元素数量过多,会引起锁竞争的加重;如果太大,原本属于同一个Segment的元素会被分配到不同的Segment,会引起Cpu缓存命中率下降,进而导致程序性能下降。
1 //initialCapacity:初始容量,默认16。 2 //loadFactor:负载因子,默认0.75。当元素个数大于loadFactor*最大容量时需要扩容(rehash) 3 //concurrencyLevel:并发级别,默认16。确定Segment的个数,Segment的个数为大于等于concurrencyLevel的第一个2^n。 4 public ConcurrentHashMap(int initialCapacity, 5 float loadFactor, int concurrencyLevel) { 6 //判断参数是否合法 7 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 8 throw new IllegalArgumentException(); 9 //Segment最大个数MAX_SEGMENTS = 1 << 16,即65536; 10 if (concurrencyLevel > MAX_SEGMENTS) 11 concurrencyLevel = MAX_SEGMENTS; 12 13 // Find power-of-two sizes best matching arguments 14 int sshift = 0; 15 int ssize = 1; 16 //使用循环找到大于等于concurrencyLevel的第一个2^n。ssize就表示Segment的个数。 17 while (ssize < concurrencyLevel) { 18 ++sshift; //记录移位的次数, 19 ssize <<= 1;//左移1位 20 } 21 this.segmentShift = 32 - sshift; //用于定位hash运算的位数,之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的 22 this.segmentMask = ssize - 1; //hash运算的掩码,ssize为2^n,所以segmentMask每一位都为1。目的是之后可以通过key的hash值与这个值做&运算确定Segment的索引。 23 //最大容量MAXIMUM_CAPACITY = 1 << 30; 24 if (initialCapacity > MAXIMUM_CAPACITY) 25 initialCapacity = MAXIMUM_CAPACITY; 26 //计算每个Segment所需的大小,向上取整 27 int c = initialCapacity / ssize; 28 if (c * ssize < initialCapacity) 29 ++c; 30 int cap = MIN_SEGMENT_TABLE_CAPACITY;//每个Segment最小容量MIN_SEGMENT_TABLE_CAPACITY = 2; 31 //cap表示每个Segment的容量,也是大于等于c的2^n。 32 while (cap < c) 33 cap <<= 1; 34 //创建一个Segment实例,作为Segment数组ss的第一个元素 35 // create segments and segments[0] 36 Segment<K,V> s0 = 37 new Segment<K,V>(loadFactor, (int)(cap * loadFactor), 38 (HashEntry<K,V>[])new HashEntry[cap]); 39 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 40 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] 41 this.segments = ss; 42 }