在并发环境下ArrayList和HashMap都是不安全的,对于ArrayList,假设多线程对同一个ArrayList一起做写入操作,有可能多个线程同时对ArrayList中同一个位置做赋值,导致最后期望数据和实际数据不一致。HashMap一样有可能出现数据不一致这个问题,且使用HashMap有可能出现更大的灾难,那就是多线程对同一个HashMap做遍历过程中可能会破坏链表的结构,出现如两个结点互相指向对方导致链表成环的问题,最后程序一直死循环无法退出。
解决的方法也有,例如用Vector代替ArrayList,用Collections.synchronizedMap()方法包装HashMap或是使用专门为并发进行了优化的ConcurrentHashMap。Collections.synchronizedMap()中使用了一个Map<K, V>接口,里面有一个final Object mutex的类实例对象来实现互斥操作,例如Map.get()方法的实现是:
public V get(Object key) {
synchronized (mutex) {
return m.get(key);
}
}
可以看到方法调用前会去获得mutex的对象锁。这样做保证了线程安全,但是锁操作保护临界资源,即同一时刻只能有一个线程能执行,其他需要获得这个锁的线程都会进行等待,对于追求高效率的多线程,这样做明显性能不太好。
不用锁就可以让多线程长期保持执行状态,不会因为等待锁而导致线程等待,但是不用锁就要想其他办法来解决可能导致的数据不一致问题。为了保证多线程并发安全性,无锁的方式使用了一种复杂的算法-比较交换算法(CAS Compare And Swap)。
抽象地表达CAS就是,这个算法需要三个参数,V,E和N。V表示要做更新的变量(例如对其做写入操作);E表示变量的预期值,即你希望读到这个变量时它的预期值是什么;N表示变量的新值,仅当变量V的值等于预期值E时,才会对变量V的值设置未N,否则就什么也不做,最后返回当前V的值。
多线程一起用CAS对临界资源的访问,各个线程执行前都会先去比较自己拿到的临界资源是否符合自己的“期望”,如果是才会执行自己的操作,如果不符合自己的期望值,例如被其他线程修改了,那么就不做操作,这样来保证数据的正确性或一致性。可以看出,这个期望值让当前线程可以发现某一变量是否被其他线程给修改过了(通过比较期望值),然后做出正确的处理。
在JDK中有一个ConcurrentLinkedQueue类来实现线程安全的队列,采用链表的存储结构,这个队列就是CAS实例之一,例如它的增加结点方法就比较复杂,在看它的源码之前,我想先简单回顾回顾出现的两样东西:volatile关键字和Unsafe类。
对于volatile修饰的变量,即告诉了Java虚拟机这个变量很有可能会被其他线程改动,虚拟机会特别“注意”保证这个变量的可见性特点。但volatile无法保证原子性,即多线程对volatile修饰的变量做复合操作时,仍可能会出现数据不一致的问题。
Unsafe类里封装了一些类似指针的操作,例如compareAndSwapInt()方法:
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
参数offset表示对象o的偏移量,expected表示期望值,x表示要设置的值。
好了,前面铺垫了那么多,现在来看看这个ConcurrentLinkedQueue队列的一些实现细节,结点:
//结点的数据结构
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
还有队列的一些操作:
//设置当前结点Node的值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//设置结点为哨兵结点
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//设置val为com的下一个结点
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.conpareAndSwapObject(this, nextOffset, cmp, val);
}
在ConcurrentLinkedQueue类也就是这个队列中有两个标识队列头结点和尾结点的字段head和tail。以前学数据结构时,无论队列还是栈,通常使用链表结构做增删操作后,tail最后都会指向最后一个结点,但在这里不一定,在ConcurrentLinkedQueue队列中,tail的更新不是实时的,是隔一个元素后再更新tail的。如图所示:
这样做的目的是为了实现CAS策略,下面会说。
看ConcurrentLinkedQueue队列的增加结点操作代码时,感叹于它的实现,充分考虑到了多线程的问题,例如tail结点在某一时刻被其他线程修改的情况。来看看这个增加结点的offer()方法:
假设从一个空的队列开始,一开始head==tail。加入第一个结点时,第36行q为null(因为p.next为null),代码进入第39行把p.next结点通过casNext()方法设置成newNode(即把新结点链接在队列最后)。第一次增加结点时,p==t,所以不会进入第42行的把tail指向末尾的新增加进队列的newNode结点,最后进入第44行程序返回。
接下来添加第二个结点,从第35行开始,p一开始还是等于t(即tail),可是此时tail并未做更新,还是等于head,即t和p都是等于head,所以p.next是队列的第一个结点,并不为空,第37到45行代码都会被跳过,进入第52行,遍历链表查看下一个结点是否为null,即寻找队列中的最后一个结点。问号前的一系列判断,是为了处理当前tail被其他线程修改的情况,假设tail被其他线程修改了,那么p!=t成立,且此时t指向的也就不是新的tail结点。遇到这种被其他线程修改了tail字段的情况,就会把tail赋值给p,用新的tail作为队列的末尾,把其当作是队列的最后一个结点。如果tail没被其他线程修改,则把p指向q结点,q为p.next,即让p往下遍历。
对越这个t != t操作是可行的,因为“!=”运算符不具有原子性,它有可能在运行过程中被中断。当判断t != (t = tail)时,系统先取得t的值,然后再执行t = tail,最后执行“!=”运算符。在系统取得t的值后,有可能其他线程突然修改了tail的值,导致执行t = tail时t获得的tail字段是新的值,所以会出现等式左边的t与右边的t不同的情况,这也就是用来解决多线程某一时刻修改了临界资源的情况。
得到队列的末尾结点后,p.next会等于null,即q等于null,在第39行会修改p.next结点为新增加的结点newNode。而此时p != t了,因为p已经被更新,而t还是旧的值,所以会执行第42行更新tail字段指向末尾结点newNode,也就是刚刚新添加进来的结点。
最后一种情况就是第46到49行,遇到哨兵结点的情况,因为哨兵结点是next指向了自己,所以假如遍历队列时遇到哨兵结点,就无法获得下一个结点,导致无法找到队列末尾,这种情况的处理过程是:判断此时tail有没有被其他线程修改,如果有,就把这个新的tail作为队列的末尾结点,即p指向新的tail,然后把新结点链接到p.next;如果tail没有被其他线程修改,那么遇到哨兵结点,你无法从哨兵结点获得下一个结点,所以只能让p指向head,即从头开始重新遍历一次队列。
对于哨兵结点的产生是这样的,假设我们使用ConcurrentLinkedQueue队列并向里面添加一个元素后,接着使用poll()方法弹出队列里的元素:
一开始让p和h都等于head,即从队列的head开始,寻找第一个首元结点(首元结点指第一个带数据值的结点)。由于head是头结点标记,其item为null,所以第一轮循环直接进入第73行,把p指向q,而q在前面offer()方法里直到,它是等于p.next,这一步操作是往下一个结点寻找。第二轮循环中,p.item为队列的第一个元素,即首元结点,代码进入第62行把p.item设置为null,因为是弹出队列操作,所以这一步是从队列中删除一个结点。
此时的p != h,因为h还是头结点head,而p是head的下一个结点,所以执行updateHead操作把p结点更新为头结点head,为什么要把p结点更新为头结点?因为p结点的next是指向队列的第二个元素结点的啊(假设队列中还有值),那么我们就可以把这个item设置成了null的结点更新为头结点(因为它的next是指向队列的第二个元素结点嘛)。同时把head通过lazySetNext()方法设置成一个next指向自己的哨兵结点,这部分操作都在updateHead()方法中,看一下updateHead()方法:
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p)) {
h.lazySetNext(h); //h结点设置成next指向自己的哨兵结点
}
}
也就是说,它的过程是这样的:
假设一开始队列中有两个元素结点,当我们执行poll()方法,即要弹出结点“1”时,p会指向结点1,然后把它的item值设置为null,接着把head字段更新为p,既让p结点成为新的head,而旧的head结点则通过lazySetNext()方法设置成哨兵结点。
这样看来好像没什么问题,即使旧的head结点设置成了哨兵结点,从新的head开始遍历似乎不会遇到这个哨兵结点啊,那么前面offer()方法中处理哨兵结点的操作是为了应对什么情况?是这样的,假设我们只往队列中增加一个结点后就立刻调用poll()方法,会是这样的情况:
由于只做了一次增加结点操作,根据offer()方法,tail字段并不会做更新,而是继续指向和head一样的头结点,这时候如果我们做了poll()操作:
可以看到,由于tail和head字段都指向旧的head结点,做了poll()操作后,虽然head字段更新了,指向了新的队列头结点,但tail字段并没有及时更新(根据offer()方法),导致下一次offer()操作向队列中增加新结点时,会遇到tail这个哨兵结点,无法取得队列的正确的最后一个结点位置的问题,这就是offer()方法中处理哨兵结点的意义。