一、概述
ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。
读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
1)支持公平和非公平的获取锁的方式;
2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
4)读取锁和写入锁都支持锁获取期间的中断;
5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
线程进入读锁的前提条件:
没有其他线程的写锁,
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁
二、源码查看
父接口定义
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
ReentrantReadWriteLock 也是基于AQS实现的,在其内部也是通过一个内部类Sync实现同步器AQS,同样也是通过实现Sync实现公平锁和非公平锁。它的自定义同步器(继承AQS)需要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。
自定义的同步器
abstract static class Sync extends AbstractQueuedSynchronizer{}
2.1、ReentrantReadWriteLock构造方法
//ReentrantReadWriteLock private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock(){ this(false); //默认非公平锁 } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); //锁类型(公平/非公平) readerLock = new ReadLock(this); //构造读锁 writerLock = new WriteLock(this); //构造写锁 } …… public ReentrantReadWriteLock.WriteLock writeLock0{return writerLock;} public ReentrantReadWriteLock.ReadLock readLock0{return ReaderLock;}
2.1.1、ReadLock读锁实现
//ReentrantReadWriteLock$ReadLock public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync;//最后还是通过Sync内部类实现锁 } //它实现的是Lock接口,其余的实现可以和ReentrantLock作对比,获取锁、释放锁等等 public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); } public String toString() { int r = sync.getReadLockCount(); return super.toString() + "[Read locks = " + r + "]"; } }
2.1.2、WriteLock写锁实现
//ReentrantReadWriteLock$WriteLock public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; /** * Constructor for use by subclasses * * @param lock the outer lock object * @throws NullPointerException if the lock is null */ protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync;//通过Sync内部类实现锁 } //它实现的是Lock接口,其余的实现可以和ReentrantLock作对比,获取锁、释放锁等等 public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock( ) { return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public int getHoldCount() { return sync.getWriteHoldCount(); } }
上面是对ReentrantReadWriteLock做了一个大致的介绍,可以看到在其内部有好几个内部类,实际上读写锁内有两个锁——ReadLock、WriteLock,这两个锁都是实现自Lock接口,可以和ReentrantLock对比,而这两个锁的内部实现则是通过Sync,也就是同步器AQS实现的,这也可以和ReentrantLock中的Sync对比。
2.2、状态确定方式
回顾一下AQS,其内部有两个重要的数据结构——一个是同步队列、一个则是同步状态,这个同步状态应用到读写锁中也就是读写状态,但AQS中只有一个state整型来表示同步状态,读写锁中则有读、写两个同步状态需要记录。所以,读写锁将AQS中的state整型做了一下处理,它是一个int型变量一共4个字节32位,那么可以读写状态就可以各占16位——高16位表示读,低16位表示写。
下面是《并发编程的艺术》给出的术语:如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁是将变量切分成了两个部分,高16位表示读,低16位表示写,划分方式如图所示。
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
假设当前同步状态值为S,get和set的操作如下:
(1)获取写状态:
S&0x0000FFFF:将高16位全部抹去
(2)获取读状态:
S>>>16:无符号补0,右移16位
(3)写状态加1:
S+1
(4)读状态加1:
S+(1<<16)即S + 0x00010000
在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。
2.3、写锁的获取与释放
WriteLock类中的lock和unlock方法:
public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }
AQS已经将获取锁的算法骨架搭好了,只需子类实现tryAcquire(独占锁)和tryRelease。可以看到就是调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。
2.3.1、写锁的获取
protected final boolean tryAcquire(int acquires) { //当前线程 Thread current = Thread.currentThread(); //获取状态 int c = getState(); //写线程数量(即获取独占锁的重入数) int w = exclusiveCount(c); //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁 if (c != 0) { // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false; // 如果写锁状态不为0且写锁没有被当前线程持有返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //判断同一线程获取写锁是否超过最大次数(65535),支持可重入 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //更新状态 //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。 setState(c + acquires); return true; } //到这里说明此时c=0,读锁和写锁都没有被获取 //writerShouldBlock表示是否阻塞 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //设置锁为当前线程所有 setExclusiveOwnerThread(current); return true; }
其中exclusiveCount方法表示占有写锁的线程数量,源码如下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
说明:直接将状态state和(2^16 - 1)做与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。
从源代码可以看出,获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程图如下:
2.3.2、写锁的释放,tryRelease方法:
protected final boolean tryRelease(int releases) { //若锁的持有者不是当前线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写锁的新线程数 int nextc = getState() - releases; //如果独占模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) //若写锁的新线程数为0,则将锁的持有者设置为null setExclusiveOwnerThread(null); //设置写锁的新线程数 //不管独占模式是否被释放,更新独占重入数 setState(nextc); return free; }
写锁的释放过程还是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,如果不是抛出异常。然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。
说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。其方法流程图如下。
2.4、读锁的获取与释放
类似于写锁,读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。
2.4.1、读锁的获取,看下tryAcquireShared方法
protected final int tryAcquireShared(int unused) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取状态 int c = getState(); //如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读锁数量 int r = sharedCount(c); /* * readerShouldBlock():读锁是否需要等待(公平锁原则) * r < MAX_COUNT:持有线程小于最大数(65535) * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态 */ // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中 if (r == 0) { // 读锁数量为0 // 设置第一个读线程 firstReader = current; // 读线程占用的资源数为1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入 // 占用资源数加1 firstReaderHoldCount++; } else { // 读锁数量不为0并且不为当前线程 // 获取计数器 HoldCounter rh = cachedHoldCounter; // 计数器为空或者计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程对应的计数器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 计数为0 //加入到readHolds中 readHolds.set(rh); //计数+1 rh.count++; } return 1; } return fullTryAcquireShared(current); }
其中sharedCount方法表示占有读锁的线程数量,源码如下:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
说明:直接将state右移16位,就可以得到读锁的线程数量,因为state的高16位表示读锁,对应的第十六位表示写锁数量。
读锁获取锁的过程比写锁稍微复杂些,首先判断写锁是否为0并且当前线程不占有独占锁,直接返回;否则,判断读线程是否需要被阻塞并且读锁数量是否小于最大值并且比较设置状态成功,若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;若当前线程线程为第一个读线程,则增加firstReaderHoldCount;否则,将设置当前线程对应的HoldCounter对象的值。流程图如下。
注意:更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。
fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { // 无限循环 // 获取状态 int c = getState(); if (exclusiveCount(c) != 0) { // 写线程数量不为0 if (getExclusiveOwnerThread() != current) // 不为当前线程 return -1; } else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // 当前线程为第一个读线程 // assert firstReaderHoldCount > 0; } else { // 当前线程不为第一个读线程 if (rh == null) { // 计数器不为空 // rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的tid不为当前正在运行的线程的tid rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功 if (sharedCount(c) == 0) { // 读线程数量为0 // 设置第一个读线程 firstReader = current; // firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }