目录
2.AbstractQueuedSynchronizer在ReenTrantLock实现可重入锁
3.通过AbstractQueuedSynchronizer创建简单的同步锁
1.基本概念
AbstractQueuedSynchronizer
AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。
简单来说,作用就是维护锁的当前状态和线程等待列表,维护的关键则是对字段state以及双端双向队列的使用
ReenTrantLock
ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁,即当前线程获取该锁再次获取不会被阻塞。
2.AbstractQueuedSynchronizer在ReenTrantLock实现可重入锁
2.1 基本特性
2.1.1 重入
// 可重入测试
public class ReentrantTest {
public static void main(String[] args) {
// main --> firstAction --> secondAction --> lastAction
SyncAction.doAction(ReentrantTest::firstAction);
}
private static void firstAction() {
SyncAction.doAction(ReentrantTest::secondAction);
}
private static void secondAction() {
SyncAction.doAction(ReentrantTest::lastAction);
}
private static void lastAction() {
System.out.println("Hello World, last Action.");
}
}
// 同步方法
public class SyncAction {
private static ReentrantLock lock = new ReentrantLock();
// 自定义锁
// private static CustomLock lock = new CustomLock();
/**
* 功能描述:同步方法
*
* @param runnable
* @author lvchengyi
* @date 10:43 下午 2020/9/1
*/
public static void doAction(Runnable runnable) {
lock.lock();
try {
runnable.run();
} catch (Exception e) {
System.out.println(e);
} finally {
lock.unlock();
}
}
}
|
在同一个线程中,代码中三次lock操作,都进入了锁,体现锁的重入性。
2.1.2 同步
// 同步测试
public class SyncTest {
public static void main(String[] args) {
// main --> firstAction 终止,死锁发生
// main线程拥有ReentrantLock的锁,firstAction线程等待main线程的锁释放
// main线程等待firstAction方法执行完毕,也在等待,因此发生死锁
ReentrantTest.syncAction(SyncTest::firstAction);
}
public static void firstAction() {
System.out.println(Thread.currentThread().getName());
CompletableFuture.runAsync(() -> ReentrantTest.syncAction(SyncTest::secondAction)).join();
}
public static void secondAction() {
System.out.println(Thread.currentThread().getName());
CompletableFuture.runAsync(() -> ReentrantTest.syncAction(SyncTest::lastAction)).join();
}
private static void lastAction() {
System.out.println(Thread.currentThread().getName());
System.out.println("Hello World, last Action.");
}
}
|
在不同线程中,第二次lock发生了阻塞,拿不到锁,体现锁的同步性。
2.1.3 实际场景
public class NormalLockTest {
private static int sum = 0;
public static void main(String[] args) {
List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
// 开启10个线程都做递增方法
for (int i = 0; i < 10; i++) {
// 加锁后进行递增
// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> SyncAction.doAction(NormalLockTest::addSum));
// 未加锁进行递增
CompletableFuture<Void> future = CompletableFuture.runAsync(NormalLockTest::addSum);
completableFutureList.add(future);
}
// 等待10个线程执行完毕
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
System.out.println(sum);
}
private static void addSum() {
// 给sum变量加1000
for (int i = 0; i < 1000; i++) {
sum++;
}
}
}
|
在并发场景下,假设有T1、T2、T3竞争同一个ReentrantLock锁
若T1拿到锁后,T1(lock),T2(park),T3(park)
Waited Queue → Head → T2 next → T3
T1(unlock) → T2.unpark
T2(free),T3(park) 可能会存在竞争锁
Waited Queue → Head(T2) → T3
假设T2(lock),T3(park)
......
具体如何实现见下文
2.2 AQS重要方法与ReentrantLock的关联
| 方法名 | 描述 |
|---|---|
| protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
| protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
ReentrantLock是独占锁,所以实现了tryAcquire和tryRelease。
ReentrantLock使用state字段实现同步和可重入:
-
State初始化的时候为0,表示没有任何线程持有锁。
-
当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
-
解锁也是对这个字段-1,一直到0,此线程对锁释放。
2.3 ReentrantLock和AQS交互过程
2.3.1 交互图
2.3.2 流程图-加锁
2.3.3 解析
加锁:
-
通过ReentrantLock的加锁方法Lock进行加锁操作。
-
会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
-
AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
-
tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁:
-
通过ReentrantLock的解锁方法Unlock进行解锁。
-
Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
-
Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
-
释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
2.4 AQS的框架作用如何体现
这里以ReentrantLock的非公平锁为例进行分析
2.4.1 加锁
在非公平锁中,有一段这样的代码:
static final class NonfairSync extends Sync {
...
final void lock() {
// 如果当前state为0,则拿到锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试获取锁
acquire(1);
}
...
}
|
看一下AQS的Acquire是怎么写的:
public final void acquire(int arg) {
// 尝试获取锁,如果获取锁失败,则将当前线程信息放入到内部队列中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 手动中断线程
selfInterrupt();
}
|
再看一下tryAcquire方法:
// AQS的tryAcquire,具体由下层实现,实现主要是对AQS中state变量以及OwnerThread一些操作
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
|
可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。
2.4.2 解锁
在ReentrantLock中,解锁的代码:
public class ReentrantLock implements Lock, java.io.Serializable {
...
public void unlock() {
sync.release(1);
}
...
}
|
非公平锁直接调AQS的release(int arg)方法
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 判断头部节点是否处于被挂起状态
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}
|
再看一下tryRelease方法:
// AQS的tryRelease,具体由下层实现,实现主要是对AQS中state变量以及OwnerThread一些操作
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
|
2.4.3 总结
由上可得,继承AQS实现同步锁,只需要在下层实现方法中维护好锁的状态,锁的双端双向队列、唤醒线程等逻辑AQS都已经在框架中实现了。
3.通过AbstractQueuedSynchronizer创建简单的同步锁
public class CustomLock implements Lock {
private Sync sync = new Sync();
@Override
public void lock() {
System.out.println(Thread.currentThread().getName() + "===尝试获取锁===");
sync.acquire(1);
System.out.println(Thread.currentThread().getName() + "===已经获取锁===");
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + "===尝试释放锁===");
sync.release(1);
System.out.println(Thread.currentThread().getName() + "====已经释放锁===");
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
Condition newCondition() {
return new ConditionObject();
}
}
}
|
4.AQS其他的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
| 同步工具 | 同步工具与AQS的关联 |
|---|---|
| ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理 |
| Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryReleaseShared会增加计数,acquireShared会减少计数 |
| CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过 |
| ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数 |
| ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease) |