前言
本文主要参考自《Java 并发编程的艺术》第五章内容,结合源码对书中内容进行分析补充。
I. Lock接口
在 Lock 接口出现之前,Java程序是靠 synchronized 关键字实现锁功能的,而 Java SE 5 之后,并发包中新增了 Lock 接口(以及相关实现类)用来实现锁功能,它提供了与 synchronized 关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。使用 synchronized 关键字是隐式地获取锁,但是它将锁的获取和释放固化了,必须先获取再释放。当然,这种方式简化了同步的管理,可是扩展性没有显式的锁获取和释放来的好。比如我们需要先获取A锁,再获取B锁,然后释放A锁,再释放B锁,synchronized 就难以实现了。
Lock的使用
Lock的使用很简单,下面是一个简单的示例:
// 创建锁
Lock lock = new ReentrantLock();
// 获取锁
lock.lock();
try {
// do sth.
} finally {
// 最终确保释放锁
lock.unlock();
}
示例中使用了 ReentrantLock —— 重入锁,它是 Lock 的一个实现类,之后的文章我们会研究这个实现类的源码。通过 Lock 接口的 lock() 和 unlock() 方法能够进行获取锁和释放锁,在 finally 块中释放锁,目的是保证在获取到锁之后,最终能够被释放。不要将获取锁的过程写在 try 块中,因为如果在获取锁时如果发生了异常,异常抛出的同时,由于必须执行 finally 块,导致锁没有获取成功反而无故释放。如果没有写在 try 块中,那么获取锁直接发生异常代码直接结束。
Lock接口定义
首先,我们直接查看 Lock 的定义。
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
*/
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface Lock {
/**
* 阻塞获取锁,调用该方法当前线程获取锁,当锁获得后,从该方法返回。
* 如果获取不到锁,则当前线程将被禁用以进行线程调度,并且在获取锁之前处于休眠状态。
* Lock的实现需要能够检测到锁的错误使用,例如可能导致死锁的调用、抛出未经检查的异常等。
* 实现方法必须记录下这些异常情况。
*/
void lock();
/**
* 可响应中断的获取锁,与lock()方法不同在于可以在当前线程获取锁的时候被中断,并抛出中断异常。
* 如果能够马上获取到锁则方法立刻返回。
*/
void lockInterruptibly() throws InterruptedException;
/**
* 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取到锁则方法返回true,获取不到返回flase。
*
* 经典的用法如下:
* Lock lock = ...;
* if (lock.tryLock()) {
* try {
* // manipulate protected state
* } finally {
* lock.unlock();
* }
* } else {
* // perform alternative actions
* }
*
* 这种写法保证获取到锁肯定被释放,没有获取到锁也不用释放。
*/
boolean tryLock();
/**
* 当前线程超时获取锁,以下三种情况会从方法返回:
* 1.当前线程已经获取锁;
* 2.当前线程在超时时间内被中断
* 3.超时时间结束,返回false
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁.
*/
void unlock();
/**
* 创建一个等待通知组件对象,该组件对象和当前的锁实例是绑定的。当前线程只有获取到锁,才能调用
* 该组件的await()方法,而调用后,当前线程会释放锁。
*/
Condition newCondition();
}
相比较 synchronized 关键字,Lock 接口还提供了非阻塞性获取锁、可中断获取锁以及超时获取锁的功能。最后的 new Condition() 方法其实和等待通知模型有关,wait()、notify()、notifyAll() 的功能则是由这个 Condition 相关类来实现的,这个后面的源码分析中我们也会研究。
我们一般使用 Java API 的锁,都用 Lock 的一些实现类,这些实现类基本都是通过聚合了一个同步器 (AbstractQueuedSynchronizer) 的子类 (Sync) 实例来完成线程访问控制的。
II. 队列同步器AQS
队列同步器 AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架。主要原理就是利用 volatile 的内存语义实现的,它使用了一个 volatile 类型的 int 成员变量 state 表示同步状态,通过内置的 FIFO 队列完成获取资源的所有线程的排队工作。
/**
* The synchronization state.
*/
private volatile int state;
/**
* 获取当前同步状态
* 该操作具有读volatile变量的内存语义
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* 设置当前同步状态
* 该操作具有写volatile变量的内存语义
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* CAS设置当前状态,该方法能够保证状态设置的原子性
* 该操作具有读和写volatile变量的内存语义
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的对 state 成员变量进行获取/设置的三个方法了,方法在上面代码片中。AQS子类推荐被定义为自定义 Lock 接口的实现类中的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了一些同步状态获取和释放的方法来供自定义锁使用。同步器AQS既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件 (同步组件也就是各种API锁等),比如重入锁 ReentrantLock、重入读写锁 ReentrantReadWriteLock 和 CountDownLatcher 等。
同步器AQS与锁的关系可以描述为:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节,比如用户想使用锁仅需要调用第一部分 Lock 接口中定义的方法即可;而同步器面向的是 Lock 接口方法的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
AQS的模板模式
AQS的设计是基于模板设计模式的,也就是说有一些模板方法需要我们去自己完善,需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
① 重写AQS中的方法
重写AQS的方法时,需要使用上文提及的同步器提供的3个方法来访问或修改同步状态。AQS中定义的需重写方法包括如下五种。
/**
* 尝试以独占式的方式获取同步状态,实现该方法需要查询当前状态并
* 判断同步状态是否符合预期,然后再进行CAS操作设置同步状态
*
* 该方法被想要获取同步状态的线程调用,如果获取失败,线程会被加入同步队列,
* 直到其他线程释放信号唤醒。该方法常常被用于实现Lock接口的tryLock()方法
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 独占式的释放同步状态,该方法被想要释放同步状态的线程调用
* 在同步队列等待的线程则将有机会获取到同步状态
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 尝试共享式的获取同步状态
*
* 该方法被想要获取同步状态的线程调用,如果获取失败则需要在等待同步队列中等待
* 方法返回值大于等于0则表示获取成功,小于0表示失败
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 尝试共享式的释放同步状态,该方法被想要释放同步状态的线程调用
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 当前同步器是否在独占模式下被当前线程占用,一般该方法表示是否被当前线程独占
*
* 此方法仅在ConditionObject方法内部调用,因此如果不使用Condition,则无需重写该方法。
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
② AQS的模板方法
同步器提供的模板方法基本上分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的所有等待线程集合。
| 模板方法名称 | 模板方法作用 |
|---|---|
| void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg) 方法。 |
| void acquireInterruptibly(int arg) | 与 acquire(int arg) 相同,但是该方法响应中断,当前线程如果未获取到同步状态则进入同步队列中;如果当前线程被中断,则该方法会抛出 InterruptedException 并返回。 |
| boolean tryAcquireNanos(int arg, long nanosTimeout) | 在 acquireInterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么就会返回 false。如果获取到就返回 true。 |
| void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。 |
| void acquireSharedInterruptibly(int arg) | 与 acquireShared(int arg) 相同,该方法响应中断。 |
| boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 在 acquireSharedInterruptibly(int arg) 基础上增加了超时限制。 |
| boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。 |
| boolean releaseShared(int arg) | 共享式的释放同步状态。 |
| Collection<Thread> getQueuedThreads() | 获取所有等待在同步队列上的线程集合。 |
关于这些方法的源码如下:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquireShared}, returning on success. Otherwise, the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted or the timeout elapses.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
* 返回同步队列中所有等待的线程集合
* 通过从尾至头方式遍历同步队列的节点,获取每个节点中保存的Thread
*/
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
利用AQS实现自定义锁
自定义同步组件/自定义锁,需要使用同步器AQS提供的模板方法来实现自己的同步语义。我们通过一个自定义独占锁的示例来演示一下如何自定义一个同步组件。独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
// AQS的子类,重写方法
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 当同步状态为 0 才尝试获取锁,CAS设置为 1
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 当同步状态为 1 才释放锁,
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 是否处于独占状态
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 返回一个 ConditionObject对象,每个condition实例都包含了一个等待队列
* @return
*/
private Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
// 实现Lock接口,利用AQS的模板方法
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
上述示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex 中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在 tryAcquire(int acquires) 方法中,如果经过CAS设置同步状态为1成功,则代表获取了同步状态,而在 tryRelease(int releases) 方法中只是将同步状态重置为0。使用 Mutex 时并不会直接和内部同步器的实现打交道,因为 Mutex 实现了 Lock 接口,我们只需要调用接口方法即可。在实现 Lock 接口的方法时,需要利用AQS的模板方法。通过模板设计模式,大大降低了自定义同步组件的设计难度。
III. AQS源码分析
分析AQS源码我们着重从所有的模板方法入手,主要包括了同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等。
同步队列
AQS依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程和其等待状态等信息构造成为一个节点(Node)加入到同步队列中,同时会阻塞当前线程。当同步状态释放时,就会把同步队列中的第一个节点中包含的线程唤醒,让其再次尝试获取同步状态。
AQS中包含两个成员变量,一个是头结点还有一个尾节点,分别指向同步队列的头和尾。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
static final class Node {
····
}
}
同步队列中的节点 (Node) 用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。
static final class Node {
/** 表示节点是共享模式,一个单例的SHARED节点 */
static final Node SHARED = new Node();
/** 表示节点是独占模式 */
static final Node EXCLUSIVE = null;
/** waitStatus值为CANCELLED 表示线程被取消 */
static final int CANCELLED = 1;
/** waitStatus值为SIGNAL 表示当前节点唤醒后继节点 */
static final int SIGNAL = -1;
/** waitStatus值为CONDITION 表示线程正在等待condition */
static final int CONDITION = -2;
/** waitStatus值为PROPAGATE 表示无条件的将共享状态传播给下一个节点 */
static final int PROPAGATE = -3;
/**
* 等待状态,一共有如下5种情况:
* SIGNAL: 后继节点的线程处于等待(WAITING)状态,而当前节点的线程如果
* 释放了同步状态或者被取消,将会通知后继节点(notify),使得
* 后继节点可以运行
* CANCELLED: 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中
* 取消等待,节点进入该状态将不会发生变化
* CONDITION: 节点现在在等待队列中(同步队列与等待队列不是一个队列,后文会讲)
* 节点中的线程等待在Condition上,当其他线程对Condition调用了signal()
* 方法后,该节点会从等待队列中转移到同步队列中,加入到对同步状态的获取中
* PROPAGATE: 下一次共享式同步状态获取将会无条件的被传播下去
* 0: 初始状态
*/
volatile int waitStatus;
/**
* 当前节点连接的先驱节点,用于检查waitStatus状态。当当前节点入队的时候
* 赋值先驱节点,出队的时候置为null为了GC。
*/
volatile Node prev;
/**
* 当前节点连接的后继节点
* 如果一个节点状态设置为取消,next将会指向自己
*/
volatile Node next;
/**
* 保存线程对象,在构造函数中初始化赋值
*/
volatile Thread thread;
/**
* 等待队列(Condition Queue)的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量
* 也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段。
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
节点是构成同步队列以及等待队列的基础,同步器AQS拥有首节点 (head) 和尾节点 (tail),没有成功获取同步状态的线程将会包装成为节点加入该同步队列的尾部。
之前我们已经研究过,AQS同步器中包含了两个 Node 类型的成员变量,一个是指向头结点的引用 head,另一个是指向尾节点的引用 tail。同步队列的入队与出队基本与双向 FIFO 队列的出入队操作非常相似。首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中。
独占式同步状态获取与释放
① 获取
通常我们调用 Lock 接口的 lock() 方法获取锁,lock() 方法一般借助 AQS 的 acquire(int arg) 方法获取同步状态,该方法不会响应中断,也就是说由于线程获取同步状态失败后会进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。我们查看AQS的 acquire(int arg) 方法实现。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
方法的英文注释还是较为清晰的,阻塞独占式的获取同步状态。if 判断中先执行 tryAcquire(arg) 方法,如果此时获取同步状态成功,则方法返回 true,由于判断条件加了逻辑非,所以 if 判断直接不满足,进而 acquire(int arg) 会退出。如果 tryAcquire(arg) 方法获取同步状态失败,那么程序逻辑下一步会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
先通过 addWaiter(Node mode) 方法构造同步节点 (独占式 Node.EXCLUSIVE 同一时刻只能有一个线程成功获取同步状态),并将返回的新节点CAS自旋操作加入到同步队列。通过CAS自旋操作能够保证在多线程场景下确保节点能够被正确添加,不会发生混乱。
/**
* 为当前线程创建指定模式mode的Node节点并入队
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 将当前线程构造指定模式的节点
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在队列尾部添加一个新节点
Node pred = tail;
if (pred != null) {
// 如果原来的尾节点存在,或者说同步队列不为空
node.prev = pred;
// CAS设置新的尾节点引用
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 运行到这,说明上面快速尝试插入尾节点失败了,所以再利用enq(node)进行插入
enq(node);
return node;
}
/**
* CAS设置尾部节点引用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* 将node插入队列中
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// CAS自旋入队操作,能保证整个队列的顺序正确
for (;;) {
Node t = tail; // 这里的tail是旧的尾节点,因为上面CAS设置尾节点肯定失败或者未执行
if (t == null) {
// 如果队列为空,需要选初始化整个队列,做法是创建一个节点设置为head和tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 运行到这说明队列已经初始化完成,进行当前线程节点入队操作
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 入队成功则返回
return t;
}
}
}
}
将当前线程的节点正确加入到同步队列后,调用 acquireQueued(final Node node, int arg) 方法,每个节点都会进入一个自旋过程,每个节点/每个线程都在自省地观察,当条件满足,获取到了同步状态,就可以从自旋中退出,否则会依旧停留在自旋过程中,并且会阻塞当前线程。相当于在排队拿号(中间没其它事干可以休息),直到拿到号后再返回。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
// 标记acquire是否成功,能否返回
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
// 又是一个“自旋”
for (;;) {
final Node p = node.predecessor(); // 获取前驱节点
// 如果前驱节点就是头结点才进行尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功,则更新新的头节点。这里不需要用CAS设置头结点是因为能获取同步状态的线程只有一个
setHead(node);
p.next = null; // 设置前驱的后继为空,来帮助GC
failed = false; // 设置为成功标记
return interrupted; // 返回是否被中断
}
// 如果不该自己获取或者获取失败,就尝试进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
interrupted = true;
}
} finally {
if (failed)
// 如果最终acquire失败了则取消当前节点并T出同步队列
cancelAcquire(node);
}
}
/**
* 设置队列的头结点引用
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 检查是否能进入等待状态,必须前驱节点同步状态为Node.SIGNAL才能进入等待状态,不是的话需进行设置前驱节点同步状态
* 返回线程是否需要阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取前驱的状态
if (ws == Node.SIGNAL)
// 如果前驱节点已经被设置成释放了Node.SIGNAL,也就是当前驱结点同步状态释放或者被取消,
// 将会通知自己,那自己就可以进入等待状态了
return true;
if (ws > 0) {
// 如果前驱节点是被取消CANCELLED状态,那就一直往前找,
// 直到找到最近一个正常等待的状态,并排在它的后边。
// 这相当于一个被取消线程出队操作!
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此时同步状态为0或者PROPAGATE,需要CAS设置成SIGNAL,让前驱OK了通知自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 让线程进入等待状态并查看是否被中断
*/
private final boolean parkAndCheckInterrupt() {
// 调用LockSupport的park()使线程进入waiting等待状态
LockSupport.park(this);
// 如果被唤醒,查看自己是不是被中断的
return Thread.interrupted();
}
/**
* 取消正在进行的acquire操作
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 清空节点保存的线程
node.thread = null;
// 往前找,跳过所有CANCELLED状态的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 新的前驱节点的后继
Node predNext = pred.next;
// 设置当前节点同步状态为Node.CANCELLED
node.waitStatus = Node.CANCELLED;
// 分两种情况进行重组同步队列,一种是当前节点是尾节点,另一种是当前节点是队列的中间节点
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // 自己接自己来帮助GC
}
}
代码的注释非常详细,基本解释了整个过程,读者可以结合书仔细阅读。节点加入到同步队列中后,进行不断的循环尝试获取同步状态(执行 tryAquire(int args)),但是只有自己是排在队伍的第二个位置才会去尝试获取,后面的节点则处于等待状态。为何如此设计呢?
- 头结点是成功获取到同步状态的节点,而头节点的线程释放了同步状态后,将唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头结点;
- 维护同步队列的FIFO原则。对于队伍中当前非头结点,如果其前驱节点如果中途出队(被中断等),当前节点会从等待状态返回,随后检查自己是不是第二个节点决定是否尝试获取同步状态。
当前线程从 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 中返回为 true,说明被中断,会调用 selfInterrupt() 方法来中断自己。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。
/**
* 中断当前线程
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
② 释放
锁释放调用 Lock 的 unlock() 接口方法,实现 unlock() 方法常用调用AQS的 release(int args) 模板方法,从而释放同步状态。该方法在释放了同步状态后,会唤醒其后继节点,后继节点重新尝试获取同步状态。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒同步队列里的下一个节点线程
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 调用LockSupport的unpark()唤醒同步队列中最前边的那个未取消线程
*/
private void unparkSuccessor(Node node) {
// 获取当前节点的同步状态
int ws = node.waitStatus;
// 置零当前线程所在的结点同步状态,允许失败。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// s为第二个节点,可能需要寻找到
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 利用LockSupport的unpark()方法唤醒线程
LockSupport.unpark(s.thread);
}
当 s 节点被唤醒后,又再次进入 acquireQueued(final Node node, int arg) 方法的自旋中。
至于 LockSupport 中的方法后面会单独进行讨论。
共享式同步状态获取与释放
① 获取
共享式获取与独占式获取主要区别在于同一时刻能否有多个线程同时获取到同步状态,或者说能多个线程获取锁。比如文件读写线程,如果一个读线程已经获取锁,那么其他读线程也可以继续获取锁,但是写线程将被阻塞;如果一个写线程已经获取锁,那么它必须是独占的,其他任何读线程和写线程都必须阻塞。
通过调用AQS的 acquireShared(int arg) 方法可以共享式的获取同步状态,该方法实现如下:
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
// 先尝试一次,没成功则调用doAcquireShared(arg)
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 不响应中断模式下的共享式获取
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 首先,创建一个共享模式的节点,并通过CAS自旋将节点加入到同步队列尾部
final Node node = addWaiter(Node.SHARED);
// 是否获取成功标记
boolean failed = true;
try {
// 等待过程中是否被中断的标记
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头结点并向后传播,再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 进入等待状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到整个流程和独占式的代码非常相似,主要区别在于将当前线程包装成节点的 mode 参数是 Node.SHARED,还有一个区别是设置头结点时,利用的 setHeadAndPropagate(node, r) 方法,而不是简单的 setHead()。
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 存储下旧的head节点
setHead(node); // 设置新的head节点,也因为单线程不需要CAS设置
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // isShared()就是由构造节点时mode参数决定的
doReleaseShared(); // 唤醒后继
}
}
此方法在 setHead() 的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。doReleaseShared() 主要是唤醒后继节点,因为是共享的,下一部分会详细研究。
在 acquireShared(int arg) 方法中,同步器调用 tryAcquireShared(arg) 方法尝试获取同步状态,其返回值大于等于0时表示获取成功。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(arg) 方法返回值大于等于0,这在 doAcquireShared(int arg) 中得以体现。失败则进入等待状态,直到被 unpark()/interrupt() 并成功获取到资源才返回。整个等待过程也是忽略中断的。
② 释放
与独占式一样,共享式获取也需要释放同步状态,通过调用 releaseShared() 方法释放同步状态。如果成功释放且允许唤醒等待线程,它会唤醒同步队列里的其他线程来获取资源。
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试释放资源
doReleaseShared(); //唤醒后继结点
return true;
}
return false;
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
整个流程主要就是释放资源,唤醒后继。释放资源是 tryReleaseShared(arg) 实现的,唤醒后继是 doReleaseShared() 实现的。在共享式获取同步状态时,setHeadAndPropagate() 方法最后调用了 doReleaseShared() 方法来唤醒后续节点来尝试获取资源。和独占式的主要区别在于 tryReleaseShared(arg) 方法必须确保同步状态(或者资源数)线程安全释放,因为释放操作会同时来自多个线程,所以用CAS自旋来保证。
独占式响应中断获取同步状态
在Java5之前,当一个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作时,该线程的中断标记位会被修改,但线程依旧被阻塞在 synchronized 上,等待着获取锁。这个过程就是独占式不响应中断获取锁的感觉嘛!在Java5中,同步器提供了能够在等待获取同步状态的时候被中断立刻返回的方法。
/**
* 独占式响应中断获取同步状态,方法只有获取成功或中断时返回
* 方法可以用来实现 Lock接口的lockInterruptibly方法
*
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看出和独占式不响应中断的区别在于,一个是设置了一个中断标记,等到获取到同步状态后再处理中断;另一个则是一旦发生中断,立刻 throw new InterruptedException() 导致方法返回。
独占式超时获取同步状态
超时获取同步状态过程可以被看成响应中断获取同步状态的“增强版”,通过调用同步器的 tryAcquireNanos(int arg, long nanosTimeout) 模板方法来进行超时获取同步状态,该方法在支持响应中断的基础上,增加了超时获取的特性。
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 到期时间
final long deadline = System.nanoTime() + nanosTimeout;
// 添加到队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 更新剩余时间
nanosTimeout = deadline - System.nanoTime();
// 时间到了就返回false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
// 响应中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似。但是在同步状态获取失败的处理上有所不同,如果当前线程获取同步状态失败,则判断是否超时 (nanosTimeout小于等于0表示已经超时),如果没有超时,重新计算超时间隔 nanosTimeout,然后使当前线程等待 nanosTimeout 纳秒 (当已到设置的超时时间,该线程会从 LockSupport.parkNanos(Object blocker, long nanos) 方法返回)。
如果 nanosTimeout 小于等于 spinForTimeoutThreshold (1000纳秒) 时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让 nanosTimeout 的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。
独占式超时获取同步状态 doAcquireNanos(int arg, long nanosTimeout) 和独占式获取同步状态 acquire(int args) 在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int args) 在未获取到同步状态时,将会使当前线程一直处于等待状态,而 doAcquireNanos(int arg, long nanosTimeout) 会使当前线程等待 nanosTimeout 纳秒,如果当前线程在 nanosTimeout 纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。
共享式响应中断获取同步状态
有了之前独占式的分析,可以发现共享式的响应中断方式实现如出一辙。
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 立即抛出异常,方法返回
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享式超时获取同步状态
相信经过之前的分析,共享式超时获取同步状态的实现应该没什么问题了。
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
* failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquireShared}, returning on success. Otherwise, the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted or the timeout elapses.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* Acquires in shared timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
IV. 自定义同步组件
设计一个同步工具,该工具同一个时刻,只允许两个线程同时访问,超过两个线程访问将会被阻塞。
首先,确定访问模式。该工具能够同一时刻支持多个线程同时访问,这明显是共享式访问。这需要同步器提供的 acquireShared(int args) 方法和 Shared 相关的方法,同时要求重写 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法,这样才能保证同步器的共享式同步状态的获取与释放方法得以执行。
其次,定义资源数量。工具在同一时刻允许两个线程同时访问,表明同步资源数为2,这样可以设置初始状态 status 为2,当一个线程进行获取,status 减一,status 加一,合法状态为0、1、2。
自定义同步组件—TwinsLock
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("初始化资源需要大于0");
}
// 设置初始化状态
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
// 自旋CAS
for (;;) {
int currentState = getState();
int newState = currentState - arg;
// 如果newState < 0没有资源,则直接短路返回负数表示获取失败
if (newState < 0 || compareAndSetState(currentState, newState)) {
return newState;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
// 自旋CAS
for (;;) {
int currentState = getState();
int newState = currentState + arg;
if (compareAndSetState(currentState, newState)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return null;
}
}
测试TwinsLock
public class TwinLockTest {
public static void main(String[] args) {
final Lock lock = new TwinsLock();
class Worker extends Thread {
@Override
public void run() {
for (;;) {
lock.lock();
System.out.println(Thread.currentThread().getName() + "获取锁");
try {
TwinLockTest.sleep();
} finally {
System.out.println(Thread.currentThread().getName() + "释放锁");
lock.unlock();
TwinLockTest.sleep();
}
}
}
}
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
for (int i = 0; i < 20; i++) {
TwinLockTest.sleep();
}
}
private static void sleep() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试结果
可以看到,线程总是成对的获取与释放锁。
参考文章
- Java并发之AQS详解
- AbstractQueuedSynchronizer同步队列与Condition等待队列协同机制
- 《Java并发编程的艺术》第五章