前面已经讲解了AQS源码的独享模式,今天来讲一下AQS的共享模式
下面以CountDownLatch去讲解AQS的共享模式
首先讲下什么是CountDownLatch,CountDownLatch所描述的是”在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待“。在API中是这么说的:
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
先看CountDownLatch的例子
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("线程1执行");
Thread.sleep(5000);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("线程2执行");
Thread.sleep(3000);
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("线程3阻塞");
latch.await();
System.out.println("线程3继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
Thread.sleep(1000);
System.out.println("主线程线程阻塞");
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程继续执行");
} |
线程3 和主线程会加入到队列中
node1会判断前序节点是否是头结点,如果是前序节点是头节点 但是计数器不为0 则阻塞自己 并将waitstatus状态改为-1 即SIGNAL
node2 会判断当前节点是否为头结点,前序节点不是头结点 直接阻塞自己 并将waitstatus状态改为-1
如果计数器为零,就会把node1给唤醒,唤醒后 node1将自己的节点设置为头结点 并将节点waitstatus状态设置为 -3 PROPAGATE
然后继续执行for循环 这时候node2的前序节点是头结点,然后继续将节点node2设置为头结点,并将节点waitstatus状态设置为-3 即PROPAGATE
接着看CountDownLatch的源码
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
//构造一个用给定计数初始化的 CountDownLatch public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
}public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}public void countDown() {
sync.releaseShared(1);
}} |
可以看出CountDownLatch内部依赖Sync实现,
Sync继承AQS。CountDownLatch仅提供了一个构造方法:
CountDownLatch(int count) : 构造一个用给定计数初始化的 CountDownLatch 设置count
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } }
Sync(int count) {
setState(count);
}
设置state是count
看countDown方法
public void countDown() {
sync.releaseShared(1);
}public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//如果此线程是被等待线程里最后一个被释放的线程 就去通知同步等待队列里的节点
doReleaseShared();
return true;
}
return false;
} |
再看tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();//获取计数器的值
if (c == 0)
return false;
int nextc = c-1;//每个被等待的线程执行完计数器减1
if (compareAndSetState(c, nextc))//设置计数器的新值
return nextc == 0;//如果计数器为0 返回true
}
}
} |
再看doReleaseShared方法
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;//如果头结点是-1 (可以看下面wait方法有讲解,已经把头结点设置为-1了 所以会走
//f (ws == Node.SIGNAL) 这一步 if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//把头结点再设置为0 不成功自旋操作,直到设置成功
continue; i // loop to recheck cases
unparkSuccessor(h);//唤醒节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
} |
再看unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//因为头结点已经设置为0了,所以ws<0不满足
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {//这一步也不满足,可以看下面wait方法里有讲解 头结点的后续节点的status都是-1
//所以这一步不满足 直接走LockSupport.unpark(s.thread);唤醒头结点的下一个节点 s = null;//如果waitstatus>0说明 节点取消了 就找下一个waitstatus是-1的节点 并唤醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
} |
再看wait方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//尝试获取锁,获取失败就执行下面的方法
doAcquireSharedInterruptibly(arg);
} |
看tryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
} |
如果state是0,说明被等待的线程全都执行完了 。return -1说明没有执行完
再看doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//如果队列是空的,就新建一个头节点,头节点指向尾节点,
//然后再新建一个节点放在头节点后面 如果队列不为空,就在尾节点后面新建一个节点。节点是shared类型的
//队列节点的waitStatus默认是0 因为上篇AQS源码一种有讲解,就不讲那么多了 boolean failed = true;
try {
for (;;) {//开启自旋
final Node p = node.predecessor();
if (p == head) {//如果新建节点的前序节点是头节点,而且state的值为0 就走到setHeadAndPropagate方法
int r = tryAcquireShared(arg);
if (r >= 0) {//如果被等待的线程执行完了
setHeadAndPropagate(node, r);//把当前节点设置为头节点,而且唤醒后续挂起的节点
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//如果当前节点的前序节点不是头节点或者计数器不等于0,就阻塞当前节点
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
} |
再看shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//队列里的节点默认都是0 所以都会走 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
//把队列里的每个节点的前序节点设置为-1 if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
} |
再看setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//把当前节点设置为头节点
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//并获取头节点的下一个节点
if (s == null || s.isShared())//如果头节点的下一个节点为空 或者不为空但是是共享节点执行doReleaseShared方法
doReleaseShared();
}
} |
看doReleaseShared方法
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//前面已经讲过 头结点的waitstatus值变为了0
//所以会走这步 把head节点的waitstaus改成-3 即PROPAGATE continue; // loop on failed C
}
if (h == head) // loop if head changed 如果头结点变化就接着执行for循环
break;
}
} |
如下是我的微信技术公众号,大家多多关注哈,你们能看我才有动力写下去,毕竟读源码不如约会好玩