AQS是J.U.C的核心
AQS(AbstractQueuedSynchronizer)队列同步器,AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待和唤醒等底层操作。
同步队列中的节点用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。
总结:在获取同步状态时,同步器维护这一个同步队列,并持有对头节点和尾节点的引用。获取状态失败的线程会被包装成节点加入到尾节点后面称为新的尾节点,在进入同步队列后开始自旋,停止自旋的条件就是前驱节点为头节点并且成功获取到同步状态。在释放同步状态时,同步器调用tryRelease方法释放同步状态,然后唤醒头节点的后继节点。
共享式同步状态获取
设计原理
-
使用Node实现FIFO队列
-
维护了一个volatile int state(代表共享资源)
-
使用方法是继承,基于模板方法
-
子类通过继承同步器并实现它的抽象方法来管理同步状态
-
可以实现排它锁和共享锁的模式(独占、共享)
具体实现的思路
1.首先 AQS内部维护了一个CLH队列,多线程争用资源被阻塞时会进入此队列。同时AQS管理一个关于共享资源状态信息的单一整数volatile int state,该整数可以表现任何状态,同时配合Unsafe工具对其原子性的操作来实现对当前锁的状态进行修改。。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)
2.线程尝试获取锁,如果获取失败,则将等待信息等包装成一个Node结点,加入到同步队列Sync queue里
3.不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒
4.当持有锁的线程释放锁的时候,会唤醒队列中的后继线程
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch),独占式或者共享式获取同步状态state。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
AQS同步组件
- CountDownLatch
- Semaphore
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
独占锁:ReentrantLock
共享锁:CountDownLatch, CyclicBarrier, Semaphore
共享和独占:ReentrantReadWriteLock
CountDownLatch
同步阻塞类,可以完成阻塞线程的功能
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
使用场景
1.程序执行需要等待某个条件完成后,才能进行后面的操作。比如父任务等待所有子任务都完成的时候,在继续往下进行
实例1:基本用法
@Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { // 为防止出现异常,放在finally更保险一些 countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }