AQS是J.U.C的核心

AQS(AbstractQueuedSynchronizer)队列同步器,AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。

J.U.C之AQS

同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待和唤醒等底层操作。

同步队列中的节点用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。

同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。
如果一个线程没有获得同步状态,那么包装它的节点将被加入到队尾,显然这个过程应该是线程安全的。因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递一个它认为的尾节点和当前节点,只有设置成功,当前节点才被加入队尾。这个过程如下所示
J.U.C之AQS
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,这一过程如下:
 
J.U.C之AQS
独占式同步状态获取
节点进入同步队列后,就进入了自旋的过程,每个节点都在自省的观察,头结点出队列时,自己的前驱节点是否是头结点,如果是,尝试获取同步状态。可以看见节点和节点之间在循环检查的过程中基本不相互通信,而是简单的判断自己的前驱节点是否是头结点,这样就使得节点的释放符合FIFO。
 
J.U.C之AQS

J.U.C之AQS

总结:在获取同步状态时,同步器维护这一个同步队列,并持有对头节点和尾节点的引用。获取状态失败的线程会被包装成节点加入到尾节点后面称为新的尾节点,在进入同步队列后开始自旋,停止自旋的条件就是前驱节点为头节点并且成功获取到同步状态。在释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取

 共享式获取与独占式获取的区别就是同一时刻是否可以多个线程同时获取到同步状态。

设计原理

  • 使用Node实现FIFO队列

  • 维护了一个volatile int state(代表共享资源)

  • 使用方法是继承,基于模板方法

  • 子类通过继承同步器并实现它的抽象方法来管理同步状态

  • 可以实现排它锁和共享锁的模式(独占、共享)

J.U.C之AQS

具体实现的思路

1.首先 AQS内部维护了一个CLH队列,多线程争用资源被阻塞时会进入此队列。同时AQS管理一个关于共享资源状态信息的单一整数volatile int state,该整数可以表现任何状态,同时配合Unsafe工具对其原子性的操作来实现对当前锁的状态进行修改。。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)

2.线程尝试获取锁,如果获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里

3.不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒

4.当持有锁的线程释放锁的时候,会唤醒队列中的后继线程

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch),独占式或者共享式获取同步状态state。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

AQS同步组件

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

独占锁:ReentrantLock

共享锁:CountDownLatch, CyclicBarrier, Semaphore

共享和独占:ReentrantReadWriteLock

CountDownLatch

同步阻塞类,可以完成阻塞线程的功能

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

 J.U.C之AQS

使用场景

1.程序执行需要等待某个条件完成后,才能进行后面的操作。比如父任务等待所有子任务都完成的时候,在继续往下进行

实例1:基本用法

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    // 为防止出现异常,放在finally更保险一些
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
View Code

相关文章: