前面已经讲解了AQS源码的独享模式,今天来讲一下AQS的共享模式

 

下面以CountDownLatch去讲解AQS的共享模式

 

首先讲下什么是CountDownLatch,CountDownLatch所描述的是”在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待“。在API中是这么说的:

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

先看CountDownLatch的例子

 

public static void main(String[] args) {
    final CountDownLatch latch = new CountDownLatch(2);
  
    new Thread(){
        public void run() {
            try {
                System.out.println("线程1执行");
                Thread.sleep(5000);
                latch.countDown();
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }.start();
  
    new Thread(){
        public void run() {
            try {
                System.out.println("线程2执行");
                Thread.sleep(3000);
                latch.countDown();
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }.start();
  
    new Thread(){
        public void run() {
            try {
                System.out.println("线程3阻塞");
                latch.await();
                System.out.println("线程3继续执行");
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }.start();
  
    try {
        Thread.sleep(1000);
        System.out.println("主线程线程阻塞");
        latch.await();
    catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("主线程继续执行");
  
}

 

线程3 和主线程会加入到队列中

 

AQS源码解读 二

 node1会判断前序节点是否是头结点,如果是前序节点是头节点 但是计数器不为0 则阻塞自己 并将waitstatus状态改为-1 即SIGNAL

 node2 会判断当前节点是否为头结点,前序节点不是头结点 直接阻塞自己 并将waitstatus状态改为-1

 

如果计数器为零,就会把node1给唤醒,唤醒后 node1将自己的节点设置为头结点 并将节点waitstatus状态设置为 -3 PROPAGATE

然后继续执行for循环 这时候node2的前序节点是头结点,然后继续将节点node2设置为头结点,并将节点waitstatus状态设置为-3 即PROPAGATE

 

 

 

接着看CountDownLatch的源码

public class CountDownLatch {
    /**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
 private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
 
        Sync(int count) {
            setState(count);
        }
 
        int getCount() {
            return getState();
        }
 
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
 
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
 for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
 
    private final Sync sync;
 
//构造一个用给定计数初始化的 CountDownLatch
 public CountDownLatch(int count) {
        if (count < 0throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
}
 
 
 
 
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
 
 
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
 
 
public void countDown() {
    sync.releaseShared(1);
}
}
 

 

可以看出CountDownLatch内部依赖Sync实现,

Sync继承AQS。CountDownLatch仅提供了一个构造方法:

CountDownLatch(int count) : 构造一个用给定计数初始化的 CountDownLatch 设置count

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
}
Sync(int count) {
    setState(count);
}

设置state是count

 

看countDown方法

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//如果此线程是被等待线程里最后一个被释放的线程 就去通知同步等待队列里的节点
        doReleaseShared();
        return true;
    }
    return false;
}

 

再看tryReleaseShared方法 

protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
 for (;;) {
            int c = getState();//获取计数器的值
            if (c == 0)
                return false;
            int nextc = c-1;//每个被等待的线程执行完计数器减1
            if (compareAndSetState(c, nextc))//设置计数器的新值
                return nextc == 0;//如果计数器为0 返回true
        }
    }
}

 

 

再看doReleaseShared方法

private void doReleaseShared() {
    /*
 * Ensure that a release propagates, even if there are other
 * in-progress acquires/releases. This proceeds in the usual
 * way of trying to unparkSuccessor of head if it needs
 * signal. But if it does not, status is set to PROPAGATE to
 * ensure that upon release, propagation continues.
 * Additionally, we must loop in case a new node is added
 * while we are doing this. Also, unlike other uses of
 * unparkSuccessor, we need to know if CAS to reset status
 * fails, if so rechecking.
 */
 for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;//如果头结点是-1 (可以看下面wait方法有讲解,已经把头结点设置为-1了 所以会走
//f (ws == Node.SIGNAL) 这一步
 
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//把头结点再设置为0 不成功自旋操作,直到设置成功
                    continue;        i    // loop to recheck cases
 unparkSuccessor(h);//唤醒节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
 }
        if (h == head)                   // loop if head changed
 break;
    }
}

 

 
再看unparkSuccessor方法
private void unparkSuccessor(Node node) {
    /*
 * If status is negative (i.e., possibly needing signal) try
 * to clear in anticipation of signalling. It is OK if this
 * fails or if status is changed by waiting thread.
 */
 int ws = node.waitStatus;//因为头结点已经设置为0了,所以ws<0不满足
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
 
    /*
 * Thread to unpark is held in successor, which is normally
 * just the next node. But if cancelled or apparently null,
 * traverse backwards from tail to find the actual
 * non-cancelled successor.
 */
 Node s = node.next;
    if (s == null || s.waitStatus > 0) {//这一步也不满足,可以看下面wait方法里有讲解 头结点的后续节点的status都是-1
//所以这一步不满足 直接走LockSupport.unpark(s.thread);唤醒头结点的下一个节点
        s = null;//如果waitstatus>0说明 节点取消了 就找下一个waitstatus是-1的节点 并唤醒
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

 

再看wait方法

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//尝试获取锁,获取失败就执行下面的方法
        doAcquireSharedInterruptibly(arg);
}

 

 

看tryAcquireShared方法

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

 

如果state是0,说明被等待的线程全都执行完了 。return -1说明没有执行完 

再看doAcquireSharedInterruptibly方法

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);//如果队列是空的,就新建一个头节点,头节点指向尾节点,
   //然后再新建一个节点放在头节点后面 如果队列不为空,就在尾节点后面新建一个节点。节点是shared类型的
//队列节点的waitStatus默认是0 因为上篇AQS源码一种有讲解,就不讲那么多了
    boolean failed = true;
    try {
        for (;;) {//开启自旋
            final Node p = node.predecessor();
            if (p == head) {//如果新建节点的前序节点是头节点,而且state的值为0 就走到setHeadAndPropagate方法
 
                int r = tryAcquireShared(arg);
                if (r >= 0) {//如果被等待的线程执行完了
 
                    setHeadAndPropagate(node, r);//把当前节点设置为头节点,而且唤醒后续挂起的节点
                    p.next = null// help GC
 failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//如果当前节点的前序节点不是头节点或者计数器不等于0,就阻塞当前节点
                throw new InterruptedException();
        }
    finally {
        if (failed)
            cancelAcquire(node);
    }
}

 

再看shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//队列里的节点默认都是0 所以都会走 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
//把队列里的每个节点的前序节点设置为-1
 
    if (ws == Node.SIGNAL)
        /*
 * This node has already set status asking a release
 * to signal it, so it can safely park.
 */
 return true;
    if (ws > 0) {
        /*
 * Predecessor was cancelled. Skip over predecessors and
 * indicate retry.
 */
 do {
            node.prev = pred = pred.prev;
        while (pred.waitStatus > 0);
        pred.next = node;
    else {
        /*
 * waitStatus must be 0 or PROPAGATE. Indicate that we
 * need a signal, but don't park yet. Caller will need to
 * retry to make sure it cannot acquire before parking.
 */
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

 

再看setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
 setHead(node);//把当前节点设置为头节点
    /*
 * Try to signal next queued node if:
 * Propagation was indicated by caller,
 * or was recorded (as h.waitStatus either before
 * or after setHead) by a previous operation
 * (note: this uses sign-check of waitStatus because
 * PROPAGATE status may transition to SIGNAL.)
 * and
 * The next node is waiting in shared mode,
 * or we don't know, because it appears null
 *
 * The conservatism in both of these checks may cause
 * unnecessary wake-ups, but only when there are multiple
 * racing acquires/releases, so most need signals now or soon
 * anyway.
 */
 if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;//并获取头节点的下一个节点
        if (s == null || s.isShared())//如果头节点的下一个节点为空 或者不为空但是是共享节点执行doReleaseShared方法
 
            doReleaseShared();
    }
}

 

看doReleaseShared方法

private void doReleaseShared() {
    /*
 * Ensure that a release propagates, even if there are other
 * in-progress acquires/releases. This proceeds in the usual
 * way of trying to unparkSuccessor of head if it needs
 * signal. But if it does not, status is set to PROPAGATE to
 * ensure that upon release, propagation continues.
 * Additionally, we must loop in case a new node is added
 * while we are doing this. Also, unlike other uses of
 * unparkSuccessor, we need to know if CAS to reset status
 * fails, if so rechecking.
 */
 for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
 unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//前面已经讲过 头结点的waitstatus值变为了0
//所以会走这步 把head节点的waitstaus改成-3 即PROPAGATE
 
                continue;                // loop on failed C
 }
        if (h == head)                   // loop if head changed 如果头结点变化就接着执行for循环
 break;
    }
}


如下是我的微信技术公众号,大家多多关注哈,你们能看我才有动力写下去,毕竟读源码不如约会好玩AQS源码解读 二 

AQS源码解读 二


相关文章: