二,关于锁的几个概念
三,ReentrantLock类图
四,几个重要的类
五,公平锁获取
5.1 lock
5.2 acquire
5.3 tryAcquire
5.3.1 hasQueuedPredecessors
5.3.2 compareAndSetState
5.3.3 setExclusiveOwnerThread
5.3.4 getExclusiveOwnerThread
5.4 addWaiter
5.5 acquireQueued
5.5.1 shouldParkAfterFailedAcquire
5.5.2 parkAndCheckInterrupt
5.6 selfInterrupt
Lock
Lock实现提供了比使用synchronized方法和synchronized语句块扩展性更好的锁操作,他们允许更灵活地构建,可以有相当不同的属性,并且可以支持多个相关的Condition对象。锁是一个控制被多个线程共享的资源访问的工具,一般来说,锁对共享资源提供排它地访问,一个时间内只能一个线程可以获取锁并且要求获取锁的线程是所有对共享资源的访问线种中的第一个。然而,一些锁可以允许对共享的资源并发访问,比如ReadWriteLock的读锁
关于锁的几个概念
锁重入
就是一个线程获取锁之后,这个线程还可以再次获取相同锁。
公平锁
获取锁的过程采用类似排队的机制,排在前面的线程先获取锁。
非公平锁
获取锁的过程,不排队,只要没有线程持有锁,就可以获取锁。
ReentrantLock类图
几个重要的类
从上图我们看到,
Lock是一个接口,他定义了关于锁的基本操作方法。
ReentrantLock实现了Lock接口,它是一个可重入的独占锁,与synchronized方法和语句块具有相同的行为和语义。
Sync是一个ReentrantLock中的抽象类。它和ReentrantLock是组合关系。
Sync继承于AbstractQueuedSynchronizer。
AbstractQueuedSynchronizer是一个抽象类。它提供了很多关于锁的基本操作。
Sync有两个子类。是NonfairSync和FairSync。分别表示非公平锁和公平锁的实现。
我们下面将会主要看实现的lock方法。
我们以ReentrantLock作为入口来看锁获取的过程。它常用的用法如下。
class X { private final ReentrantLock lock = new ReentrantLock(); public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }}
先看看ReentrantLock获取锁的过程
public void lock() { sync.lock(); }
它直接调用的sync对象的lock()方法。
Sync类提供了公平锁和非公平锁的公共操作。它的lock方法是一个抽象方法。具体实现在公平锁
FairSync和非公平锁实现NonfairSync中实现。首先先看公平锁版本的实现
final void lock() { acquire(1); }
- acquire
acquire(1)这方法是在在AbstractQueuedSynchronizer中实现
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它大致做了如下几个事情。
1 tryAcquire() 就是试图获取锁,如果成功就结束
2 addWaiter 新建表示当前线程的一个节点。加入队列当中
3 acquireQueued 如果试图获取失败,就加入队列中等待
4 selfInterrupt(),自己给自己一个中断。
这个方法在AbstractQueuedSynchronizer中是一个抽象的方法,具体实现就是子类中,这里就是在
ReentrantLock的FairSync类里面。
protected final boolean tryAcquire(int acquires) {
1.获取当前线程 final Thread current = Thread.currentThread();
2.当前state值 int c = getState(); if (c == 0) { //如果c等于0,表示当前没有线程占有锁,可以直接获取锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //如果队列中前面没有其它等待的线程,就把状态state值设置为acquires的值,这里是1. setExclusiveOwnerThread(current); //把当前锁的持有线程设置为当前线程 return true; } } else if (current == getExclusiveOwnerThread()) { //如果c不是0,说明已经有线程持有这个锁,就看持有锁的线程是不是当前线程,如果是就把state的值加1。然后更新它的值 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { Node t = tail; // 队列的尾 Node h = head; // 队列的头 Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
这里主要的判断逻辑,就是如果头和尾不相等,说明这个队列就不为空,并且队列的头的下一下节点不是当前线程,就说明队列中有前继节点。
但是我第一次看这个代码的时候,有个疑问,为什么头节点的next为null的时候,也说明有前继节点呢?
- compareAndSetState
这个一个CAS操作,就是原子地设置state的值,如果期望值是0,就设置state的值为传入的值。
- setExclusiveOwnerThread
这个方法是设置当前锁的持有线程为当前线程,这个方法是在AbstractOwnableSynchronizer抽象类里定义的,它是一个表示这一个锁是由那个线程独占持有的辅助类。
AbstractQueuedSynchronizer类继承了这个类。
- getExclusiveOwnerThread
如果试图获取锁不成功,就进行下一步,首先就是把这个线程加入到队列中,
addWaiter
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //首先创建一个表示当前线程的节点 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { //首先如果队列的尾节点不为null,就直接把新加节点加入到尾节点后面,把新加入节点设置为尾节点, node.prev = pred;//把新加入的节点的前继节点指向尾节点 if (compareAndSetTail(pred, node)) {//设置新加入的节点为尾节点 pred.next = node;//之前的尾结点的后继节点指向新加入的节点 return node;//返回新加入的节点 } } enq(node); 如果尾结为空,就调用这个方法新增节点 return node; }
这个方法的入参是Node.EXCLUSIVE。表示这个节点正在以排它模式等待,也就是说锁是排它锁。
注意上面的是先设置一个节点的prev节点,然后设置节点的next节点。所以在上面判断前面是否有等待的节点时,如果next为null时也认为是有等待的节点,可能判断的时候,正好有一个节点正在入队。还没设置
prev的next节点的值。我是这么理解的,
private Node enq(final Node node) { for (;;) {//自旋,直到成功把节点加入到队列中 Node t = tail; if (t == null) { //如果尾节点为null,说明这个队列还没有初始化 if (compareAndSetHead(new Node())) //new一个节点,并设置为这个节点为头节点,注意头节点是一个占位节点,它是一个空节点。 tail = head; } else {//如果头节点不为空 node.prev = t;//把当前节点的前继设置为尾节点 if (compareAndSetTail(t, node)) {//把当前节点设置为尾节点 t.next = node;//把原来尾节点的后继设置为当前节点 return t; } } } }
当先节点加入到队列中,就调用下面的方法,它是
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;//是否中断过标示,默认是没有 for (;;) {//自旋,直到这个线程获取到锁 final Node p = node.predecessor(); //当前节点的前继节点 if (p == head && tryAcquire(arg)) {//如果前继节点是头节点,就再次尝试获取锁 setHead(node);//如果获取锁成功,就把当前线程的节点设置为头节点 p.next = null; // 把原来的头节点的next节点置为null。为了更快的垃圾回收掉。 failed = false; return interrupted;//返回是否中断过的标示 }
//如果获取失败,就判断是否需要阻塞当前线程,直到下一个成功获取锁的线程的唤醒。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前继结点的waitStatus值 if (ws == Node.SIGNAL)//如果值是Node.SIGNAL,表示前一个节点被释放后,就会唤醒这个线程,所以这个线程可以阻塞,所以返回true。 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//如果值大于0,说明是被取消了。然后就往前找,直到找到一个不是被取消的前继节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
- parkAndCheckInterrupt
如果需要阻塞 就调用这个方法阻塞当前线程。并返回当前线程是否被中断过。如果需要阻塞并且被中断过。就设置中断状态为true。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
- selfInterrupt
如果尝试获取失败,在队列中等待获取锁时被中断过。就调用这个方法给自己一个中断
static void selfInterrupt() { Thread.currentThread().interrupt(); }
二 非公平锁获取
为什么是非公平呢,是因为获取锁的时候。不会去排队,如果没有线程拥有锁。就可以获取锁成功。如果不成功。他其实还是会和公平锁一样来获取锁。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
1首先调用 compareAndSetState来设置状态的值。如果成功。就说明获取到锁,并设置当前线程为拥有锁的线程。
否则就就和公平锁的获取方式一样了。
三 锁释放
Lock接口里定义了unlock方法。
ReentrantLock 的unlock方法,他调用sync的release方法。它是在AbstractQueuedSynchronizer类里实现。
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }