Java-并发-Condition
摘要
本文介绍Condition,需要配合AQS使用,他也实现了一套类似wait/notify的逻辑。本文会简单分析其实现。
0x01 基本概念
Condition类其实是位于java.util.concurrent.locks的一个接口类。他的一个常用实现类是AQS的非静态内部类ConditionObject:
public class ConditionObject implements Condition, java.io.Serializable
虽说ConditionObject是public修饰,但不能直接使用,因为他是非静态内部类,必须先实例化AQS的实例。而AQS定义如下:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
很明显,他是一个抽象类,不能直接实例化。也就是说必须使用继承他的子类才能实例化,从而使用ConditionObject。
我们最常使用的是配套ReentrantLock和Condition使用:
// 创建一个可重入锁
ReentrantLock lock = new ReentrantLock(true);
// 基于此lock创建一个condition
Condition condition = lock.newCondition();
// 使线程wait在condition上
condition.await();
condition.signal();
下面简单分析下后面3步代码实现:
0x02 实现原理
称呼规约:
- ReentrantLock专门使用的等待队列下文中称为
wait_queue - Condition专门使用的等待队列下文中称为
condition_queue
2.1 condition构建-lock.newCondition
首先会执行以下代码:
public Condition newCondition() {
return sync.newCondition();
}
继续看sync.newCondition();在做什么:
final ConditionObject newCondition() {
return new ConditionObject();
}
怎么又这么短,下面看看ConditionObject在干啥:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 指向condition_queue头结点
private transient Node firstWaiter;
// 指向condition_queue末尾结点
private transient Node lastWaiter;
public ConditionObject() { }
好吧,这ConditionObject构造方法啥也没做。只不过要留意firstWaiter和lastWaiter,表明这个condition也维护了一个拥有两个指针的链表即condition_queue如下图:
2.2 condition.await
- await
public final void await() throws InterruptedException {
if (Thread.interrupted())
// 如果线程被中断,直接抛出InterruptedException
throw new InterruptedException();
// 创建conditionNode(waitStatus=-2),并以尾插法加入condition_queue
Node node = addConditionWaiter();
// 释放拥有的所有锁许可
int savedState = fullyRelease(node);
int interruptMode = 0;
// !isOnSyncQueue(node)说明该线程结点仍为CONDITION状态,需要继续阻塞等待
// 否则说明该线程结点已经被放入了wait_queue
while (!isOnSyncQueue(node)) {
// 调用我们熟悉的LockSupport.park阻塞当前线程
LockSupport.park(this);
// 当线程park被唤醒时,需要执行checkInterruptWhileWaiting判断:
// 如果是发生在唤醒前的中断,就返回-1
// 如果是发生在唤醒后的中断,就返回1
// 如果不是,该方法就返回0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
// 中断发生,跳出循环
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 以排队方式请求锁,acquireQueued方法如果在线程等待时调用了中断请求,才会返回true
// 若发生过中断,且该中断不是在唤醒前,就将中断模式设为REINTERRUPT 1
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 下一个CONDITION结点存在,就顺便从condition_queue清理掉处于CANCEL状态的结点
unlinkCancelledWaiters();
if (interruptMode != 0)
// -1就立刻抛出InterruptedException,
// 1就对当前线程发起中断操作
reportInterruptAfterWait(interruptMode);
}
这await倒是简单,就是将持有的锁许可全释放,然后阻塞等待唤醒。下面看看其中用的几个主要方法。
- addConditionWaiter
private Node addConditionWaiter() {
// 获取condition_queue尾结点
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
// 尾结点且状态不为CONDITION,从condition_queue清理掉处于CANCEL状态的结点
unlinkCancelledWaiters();
// 此时lastWaiter可能已经被更新,所以这里需要让t指向最新的lastWaiter
t = lastWaiter;
}
// 创建一个状态为CONDITION的node,指向当前线程
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// condition_queue为空,就将node作为头结点
firstWaiter = node;
else
// condition_queue不为空,就将node尾插法插入condition_queue
t.nextWaiter = node;
// node作为condition_queue新的尾节点
lastWaiter = node;
return node;
}
- unlinkCancelledWaiters
// 这个方法作用就是将非CONDITION状态的节点从condition_queue中清除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 从头结点往后遍历
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
// 这里的几步操作就是断开非CONDITION结点与其他结点的联系
t.nextWaiter = null;
if (trail == null)
// 直接让头结点指向下一个结点
firstWaiter = next;
else
// 之前的状态为CONDITION的节点nextWaiter指向当前结点的下一个结点
trail.nextWaiter = next;
if (next == null)
// 如果遍历结束了,就更新lastWaiter指向最后一个CONDITION结点
lastWaiter = trail;
}
else
// trail指向当前循环到的最后一个状态CONDITION的结点
trail = t;
t = next;
}
}
- fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 锁许可个数
int savedState = getState();
if (release(savedState)) {
// 释放锁许可完毕
failed = false;
// 返回释放前的总许可个数
return savedState;
} else {
// 否则抛出IllegalMonitorStateException,代表锁状态异常
throw new IllegalMonitorStateException();
}
} finally {
// 锁释放失败,标记当前线程为撤销状态,会在不久后被移除
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
- isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
// 当waitStatus仍是CONDITION
// 或node还没加入lock wait_queue(加入后node.prev!=null)
// 此时就返回false,说明仍为CONDITION状态,需要继续阻塞等待
return false;
if (node.next != null)
// 在wait_queue中,node已经有后继节点了,说明肯定已经加入了wait_queue
// 相当于是快速查找的一种trick,jdk里面这种trick很多
return true;
// 这里就很自然的从尾向前找,可以最快速度确认该节点是否在wait_queue中
return findNodeFromTail(node);
}
2.3 condition.signal
public final void signal() {
if (!isHeldExclusively())
// 当前线程不持有锁,直接抛异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
接着看doSignal方法:
// 该方法主要作用就是从condition_queue中头结点开始往后遍历
// 尝试将每次遍历的节点放回wait_queue
// 只要一个结点放回成功,就结束该循环
// 也就是说,只会从condition_queue中挑选一个线程结点放回wait_queue并按需唤醒。
// 当然,在遍历过程中会从condition_queue清理掉那些状态不再是CONDITION的结点
// 最终,firstWaiter指向移到wait_queue的那个节点的下一个节点(可能为null)
private void doSignal(Node first) {
do {
// 注意,每次循环让firstWaiter指向下一个结点
if ( (firstWaiter = first.nextWaiter) == null)
// 如果当前结点的下一个结点为空,
// 就把lastWaiter也置为null,代表condition_queue为空
lastWaiter = null;
// 从condition_queue中头结点开始唤醒
// 这里就先把他的nextWaiter置空
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
看来,问题的关键在transferForSignal方法:
// 当该结点指向的线程状态不是CONDITION(如CANCEL),返回false
// 否则,将结点放回wait_queue,并根据前驱结点的状态按需unpark当前线程,返回true
final boolean transferForSignal(Node node) {
// CAS方式将CONDITION_WAIT状态的node的waitStatus重置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 失败,代表已经该Node指向的线程被撤销了(waitStatus=-1),返回false
return false;
// 将该node重新放入等待锁的wait_queue中
// 成功入队后,返回该节点的前驱结点p
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 前驱结点为撤消状态(ws=1)或是结点p的状态修改CAS(ws,SIGNAL)失败
// 就unpark唤醒当前线程,
// 然后走前面提到的AQS.acquireQueued方法里的唤醒流程
LockSupport.unpark(node.thread);
// 最后返回true
return true;
}
2.4 源码小结
代码分析完了,其实Condition思想很朴素:
-
await方法,将当前线程加入AQS.condition_queue,且会顺便清理其中不为CONDITION状态的结点 -
await方法,让当前线程释放所有的锁许可(state归0) -
await方法,将当前线程阻塞,直到该线程被放入了AQS.wait_queue -
signal方法,将从AQS.condition_queue队列的头结点开始往后遍历,从AQS.condition_queue中将该线程结点移除,并放回AQS.wait_queue,并根据前驱结点是否已经撤销或异常按需唤醒当前结点。注意,此过程只要成功移动一个节点,遍历就结束了,也就是说每次signal方法最多只能从AQS.condition_queue中移动一个结点到AQS.wait_queue。 -
signal方法,在上述遍历移动节点过程中会顺便清理掉AQS.condition_queue中那些状态不为CONDITION的结点 -
await方法,阻塞的线程因为被signal方法重新放入AQS.wait_queue而被其他前驱结点唤醒,此时有几种情况:- 意外情况。LockSupport有可能会因为意外导致线程唤醒,该情况和情况2处理相同,需要再次判断节点是否已经放入
AQS.wait_queue。 - 被
wait_queue中该结点的前驱结点执行unlock方法时唤醒。处理同情况1 - 其他线程调用
signal方法前调用中断方法唤醒,需要重设interruptMode - 其他线程调用
signal方法后调用中断方法唤醒,需要重设interruptMode
- 意外情况。LockSupport有可能会因为意外导致线程唤醒,该情况和情况2处理相同,需要再次判断节点是否已经放入
-
await方法,该节点调用acquireQueued走申请锁许可流程 -
await方法,会又一次顺便清理其中不为CONDITION状态的结点 -
await方法,按阻塞前后收到中断请求的情况按需发起中断
0x03 Condition对比wait/notify
0x04 总结
Condition特点如下:
- 必须搭配AQS.sync使用
- await过程可中断
- Condition除了使用ReentrantLock的双向链表
wait_queue,自己还维护了一个单向链表condition_queue。await