1. 简介
CyclicBarrier ,一个同步辅助类,在 AP I中是这么介绍的:
它允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 Barrier 在释放等待线程后可以重用,所以称它为循环( Cyclic ) 的 屏障( Barrier ) 。
通俗点讲就是:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
2. 实现分析
java.util.concurrent.CyclicBarrier 的结构如下:
通过上图,我们可以看到 CyclicBarrier 的内部是使用重入锁 ReentrantLock 和 Condition 。
它有两个构造函数:
-
CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。 -
CyclicBarrier(int parties, Runnable barrierAction):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。 -
代码如下:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }-
parties变量,表示拦截线程的总数量。 -
count变量,表示拦截线程的剩余需要数量。 -
barrierAction变量,为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行barrierAction,用于处理更加复杂的业务场景。 -
generation变量,表示 CyclicBarrier 的更新换代。详细解析,见 「2.4 Generation」 。
-
2.1 await
在 CyclicBarrier 中最重要的方法莫过于 #await() 方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。代码如下:
或者说,每个线程调用
#await()方法,告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。当所有线程都到达了屏障,结束阻塞,所有线程可继续执行后续逻辑。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//不超时等待
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
|
-
内部调用
#dowait(boolean timed, long nanos)方法,执行阻塞等待(timed=true)。详细解析,见 「2.3 dowait」 。 -
理论来说,不会出现 TimeoutException 异常,所以在发生时,直接抛出 Error 错误。
-
返回值为进入屏障时,剩余拦截线程的剩余需要数量。英文注释如下:
the arrival index of the current thread, where index {@code getParties() - 1} indicates the first to arrive and zero indicates the last to arrive
2.2 await
#await(long timeout, TimeUnit unit) 方法,在 #await() 的基础上,增加了等待超时的特性。代码如下:
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
|
- 内部调用
#dowait(boolean timed, long nanos)方法,执行阻塞等待(timed=true)。详细解析,见 「2.3 dowait」 。
2.3 dowait
#dowait(boolean timed, long nanos) 方法,代码如下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//分代
final Generation g = generation;
//当前generation“已损坏”,抛出BrokenBarrierException异常
//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie
if (g.broken)
//当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
throw new BrokenBarrierException();
//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//进来一个线程 count - 1
int index = --count;
//count == 0 表示所有线程均已到位,触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//触发任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
if (!ranAction) // 未执行,说明 barrierCommand 执行报错,或者线程打断等等情况。
breakBarrier();
}
}
for (;;) {
try {
//如果不是超时等待,则调用Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
//超时等待,调用Condition.awaitNanos()方法等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//generation已经更新,返回index
if (g != generation)
return index;
//“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}
|
如果该线程不是到达的最后一个线程,则他会一直处于等待状态,除非发生以下情况:
- 最后一个线程到达,即
index == 0。 - 超出了指定时间(超时等待)。
- 其他的某个线程中断当前线程。
- 其他的某个线程中断另一个等待的线程。
- 其他的某个线程在等待 barrier 超时。
- 其他的某个线程在此 barrier 调用
#reset()方法。#reset()方法,用于将屏障重置为初始状态。
在 #dowait(boolean timed, long nanos) 方法的源代码中,我们总是可以看到抛出 BrokenBarrierException 异常,那么什么时候抛出异常呢?例如:
- 如果一个线程处于等待状态时,如果其他线程调用
#reset()方法。详细解析,见 「2.7 reset」 。 - 调用的 barrier 原本就是被损坏的,则抛出 BrokenBarrierException 异常。
- 任何线程在等待时被中断了,则其他所有线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。详细解析,见 「2.6 breakBarrier」 。
2.4 Generation
Generation 是 CyclicBarrier 内部静态类,描述了 CyclicBarrier 的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有 parties 个线程全部到达 barrier 时,generation 就会被更新换代。其中 broken 属性,标识该当前 CyclicBarrier 是否已经处于中断状态。代码如下:
private static class Generation {
boolean broken = false;
}
|
- 默认 barrier 是没有损坏的。
2.5 breakBarrier
当 barrier 损坏了,或者有一个线程中断了,则通过 #breakBarrier() 方法,来终止所有的线程。代码如下:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
|
- 在
breakBarrier()方法中,中除了将broken设置为 true ,还会调用#signalAll()方法,将在 CyclicBarrier 处于等待状态的线程全部唤醒。
2.6 nextGeneration
当所有线程都已经到达 barrier 处(index == 0),则会通过 nextGeneration() 方法,进行更新换代操作。在这个步骤中,做了三件事:
- 唤醒所有线程。
- 重置
count。 - 重置
generation。
代码如下:
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
|
2.7 reset
#reset() 方法,重置 barrier 到初始化状态。代码如下:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
|
- 通过组合
#breakBarrier()和#nextGeneration()方法来实现。
2.8 getNumberWaiting
#getNumberWaiting() 方法,获得等待的线程数。代码如下:
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
|
2.9 isBroken
#isBroken() 方法,判断 CyclicBarrier 是否处于中断。代码如下:
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
|
3. 应用场景
CyclicBarrier 适用于多线程结果合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。比如,我们需要统计多个 Excel 中的数据,然后等到一个总结果。我们可以通过多线程处理每一个 Excel ,执行完成后得到相应的结果,最后通过 barrierAction 来计算这些线程的计算结果,得到所有Excel的总和。
4. 应用示例
比如我们开会只有等所有的人到齐了才会开会,如下:
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier;
static class CyclicBarrierThread extends Thread{
public void run() {
System.out.println(Thread.currentThread().getName() + "到了");
//等待
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args){
cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("人到齐了,开会吧....");
}
});
for(int i = 0 ; i < 5 ; i++){
new CyclicBarrierThread().start();
}
}
}
|
运行结果:
1. 简介
在上篇博客中,我们介绍了 Java 四大并发工具之一的 CyclicBarrier ,今天要介绍的CountDownLatch 与 CyclicBarrier 有点儿相似。
CyclicBarrier 所描述的是“允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务",而 CountDownLatch 所描述的是“在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待”。在API中是这样描述的:
用给定的计数初始化 CountDownLatch。由于调用了
#countDown()方法,所以在当前计数到达零之前,#await()方法会一直受阻塞。之后,会释放所有等待的线程,#await()的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier 。
CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候,需要带入该计数器值,该值就表示了线程的数量。
- 每当一个线程完成自己的任务后,计数器的值就会减 1 。
- 当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。
虽然,CountDownLatch 与 CyclicBarrier 有那么点相似,但是他们还是存在一些区别的:
- CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行;而 CyclicBarrier 则是允许 N 个线程相互等待。
- CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它被称为是循环的 barrier 。
2. 实现分析
java.util.concurrent.CountDownLatch 结构如下图:
通过上面的结构图我们可以看到,CountDownLatch 内部依赖 Sync 实现,而 Sync 继承 AQS 。
CountDownLatch 仅提供了一个构造方法,代码如下:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
|
- 构造一个用给定计数初始化的 CountDownLatch 。
2.1 Sync
sync 变量,为 CountDownLatch 的一个内部类 Sync ,其定义如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
// 获取同步状态
int getCount() {
return getState();
}
// 获取同步状态
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放同步状态
@Override
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}
|
- 通过这个内部类 Sync 实现类,我们可以清楚地看到, CountDownLatch 是采用共享锁来实现的。
-
#tryAcquireShared(int acquires)和#tryReleaseShared(int releases)方法,结合下文一起理解。
2.2 await
CountDownLatch 提供 #await() 方法,来使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断,定义如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
|
-
该方法内部使用 AQS 的
#acquireSharedInterruptibly(int arg)方法,代码如下:// AQS.java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }-
在内部类 Sync 中重写了
#tryAcquireShared(int arg)方法,代码如下:// Sync.java @Override protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }-
getState()方法,获取同步状态,其值等于计数器的值。从这里我们可以看到,如果计数器值不等于 0,则会调用#doAcquireSharedInterruptibly(int arg)方法。该方法为一个自旋方法会尝试一直去获取同步状态,代码如下:// AQS.java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { /** * 对于CountDownLatch而言,如果计数器值不等于0,那么r 会一直小于0 */ int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }- x
-
-
2.3 await
CountDownLatch 提供 #await(long timeout, TimeUnit unit) 方法,来使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断,或者等待超时,定义如下:
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
|
- 调用 AQS 的
tryAcquireSharedNanos(int acquires, long nanosTimeout)方法,逻辑和 「2.2 await」 类似。
2.4 countDown
CountDownLatch 提供 #countDown() 方法,递减锁存器的计数。如果计数到达零,则唤醒所有等待的线程。
public void countDown() {
sync.releaseShared(1);
}
|
-
内部调用 AQS 的
#releaseShared(int arg)方法,来释放共享锁同步状态:// AQS.java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }-
#tryReleaseShared(int arg)方法,被 CountDownLatch 的内部类 Sync 重写,代码如下:// Sync.java @Overrride protected boolean tryReleaseShared(int releases) { for (;;) { //获取锁状态 int c = getState(); //c == 0 直接返回,释放锁成功 if (c == 0) return false; //计算新“锁计数器” int nextc = c-1; //更新锁状态(计数器) if (compareAndSetState(c, nextc)) return nextc == 0; } }- x
-
2.5 getCount
public long getCount() {
return sync.getCount();
}
|
3. 总结
CountDownLatch 内部通过共享锁实现。
- 在创建 CountDownLatch 实例时,需要传递一个int型的参数:
count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。 - 当某个线程调用
#await()方法,程序首先判断count的值是否为 0 ,如果不为 0 的话,则会一直等待直到为 0 为止。 - 当其他线程调用
#countDown()方法时,则执行释放共享锁状态,使count值 - 1。 - 当在创建 CountDownLatch 时初始化的
count参数,必须要有count线程调用#countDown()方法,才会使计数器count等于 0 ,锁才会释放,前面等待的线程才会继续运行。 - 注意 CountDownLatch 不能回滚重置。
4. 应用示例
示例仍然使用开会案例。老板进入会议室等待 5 个人全部到达会议室才会开会。所以这里有两种线程:老板等待开会线程、员工到达会议室线程:
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5);
/**
* Boss线程,等待员工到达开会
*/
static class BossThread extends Thread{
@Override
public void run() {
System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
try {
//Boss等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有人都已经到齐了,开会吧...");
}
}
// 员工到达会议室线程
static class EmpleoyeeThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ",到达会议室....");
//员工到达会议室 count - 1
countDownLatch.countDown();
}
}
public static void main(String[] args){
//Boss线程启动
new BossThread().start();
for(int i = 0 ; i < countDownLatch.getCount() ; i++){
new EmpleoyeeThread().start();
}
}
}
|
运行结果:
1. 简介
信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 一样,其本质上是一个“共享锁”。
Semaphore,在 API 是这么介绍的:
一个计数信号量。从概念上讲,信号量维护了一个许可集。
- 如有必要,在许可可用前会阻塞每一个 acquire,然后再获取该许可。
- 每个 release 添加一个许可,从而可能释放一个正在阻塞的获取者。
但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
下面我们就一个停车场的简单例子来阐述 Semaphore :
- 为了简单起见我们假设停车场仅有 5 个停车位。一开始停车场没有车辆所有车位全部空着,然后先后到来三辆车,停车场车位够,安排进去停车,然后又来三辆,这个时候由于只有两个停车位,所有只能停两辆,其余一辆必须在外面候着,直到停车场有空车位。当然,以后每来一辆都需要在外面候着。当停车场有车开出去,里面有空位了,则安排一辆车进去(至于是哪辆,要看选择的机制是公平还是非公平)。
- 从程序角度看,停车场就相当于信号量 Semaphore ,其中许可数为 5 ,车辆就相对线程。当来一辆车时,许可数就会减 1 。当停车场没有车位了(许可数 == 0 ),其他来的车辆需要在外面等候着。如果有一辆车开出停车场,许可数 + 1,然后放进来一辆车。
- 信号量 Semaphore 是一个非负整数(
>=1)。当一个线程想要访问某个共享资源时,它必须要先获取 Semaphore。当 Semaphore > 0 时,获取该资源并使 Semaphore – 1 。如果S emaphore 值 = 0,则表示全部的共享资源已经被其他线程全部占用,线程必须要等待其他线程释放资源。当线程释放资源时,Semaphore 则 +1 。
2. 实现分析
java.util.concurrent.Semaphore 结构如下图:
从上图可以看出,Semaphore 内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类 Sync ,其中 Sync 继承 AQS(再一次阐述 AQS 的重要性)。
Semaphore 提供了两个构造函数:
-
Semaphore(int permits):创建具有给定的许可数和非公平的公平设置的 Semaphore 。 -
Semaphore(int permits, boolean fair):创建具有给定的许可数和给定的公平设置的 Semaphore 。
实现如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
|
- Semaphore 默认选择非公平锁。
- 当信号量 Semaphore = 1 时,它可以当作互斥锁使用。其中 0、1 就相当于它的状态:1)当
=1时表示,其他线程可以获取;2)当=0时,排他,即其他线程必须要等待。 - ???? Semaphore 的代码实现结构,和 ReentrantLock 类似。
2.1 信号量获取
Semaphore 提供了 #acquire() 方法,来获取一个许可。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
|
-
内部调用 AQS 的
#acquireSharedInterruptibly(int arg)方法,该方法以共享模式获取同步状态。代码如下:public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }-
在
#acquireSharedInterruptibly(int arg)方法中,会调用#tryAcquireShared(int arg)方法。而#tryAcquireShared(int arg)方法,由子类来实现。对于 Semaphore 而言,如果我们选择非公平模式,则调用 NonfairSync 的#tryAcquireShared(int arg)方法,否则调用 FairSync 的#tryAcquireShared(int arg)方法。若#tryAcquireShared(int arg)方法返回< 0时,则会阻塞等待,从而实现 Semaphore 信号量不足时的阻塞,代码如下:// AQS.java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } /** * 对于 Semaphore 而言,如果 tryAcquireShared 返回小于 0 时,则会阻塞等待。 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }- 老艿艿:另外,这也是为什么 Semaphore 在使用 AQS 时,
state代表的是,剩余可获取的许可数,而不是已经使用的许可数。我们假设state代表的是已经使用的许可数,那么#tryAcquireShared(int arg)返回的结果= 原始许可数 - state,这个操作在并发情况下,会存在线程不安全的问题。所以,state代表的是,剩余可获取的许可数,而不是已经使用的许可数。
- 老艿艿:另外,这也是为什么 Semaphore 在使用 AQS 时,
-
公平情况的 FairSync 的方法实现,代码如下:
// FairSync.java @Override protected int tryAcquireShared(int acquires) { for (;;) { //判断该线程是否位于CLH队列的列头,从而实现公平锁 if (hasQueuedPredecessors()) return -1; //获取当前的信号量许可 int available = getState(); //设置“获得acquires个信号量许可之后,剩余的信号量许可数” int remaining = available - acquires; //CAS设置信号量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }- 通过
#hasQueuedPredecessors()方法,判断该线程是否位于 CLH 队列的列头,从而实现公平锁。
- 通过
-
非公平情况的 NonfairSync 的方法实现,代码如下:
// NonfairSync.java protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // Sync.java final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }- 对于非公平而言,因为它不需要判断当前线程是否位于 CLH 同步队列列头,所以相对而言会简单些。
-
2.2 信号量释放
获取了许可,当用完之后就需要释放,Semaphore 提供 #release() 方法,来释放许可。代码如下:
public void release() {
sync.releaseShared(1);
}
|
-
内部调用 AQS 的
#releaseShared(int arg)方法,释放同步状态。// AQS.java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }-
releaseShared(int arg)方法,会调用 Semaphore 内部类 Sync 的#tryReleaseShared(int arg)方法,释放同步状态。// Sync.java protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //设置可获取的信号许可数为next if (compareAndSetState(current, next)) return true; } }- 如该方法返回 true 时,代表释放同步状态成功,从而在
#releaseShared(int args)方法中,调用#doReleaseShared()方法,可唤醒阻塞等待 Semaphore 的许可的线程。 - 对于信号量的获取释放详细过程,请参考如下博客:
- 如该方法返回 true 时,代表释放同步状态成功,从而在
-
2.3 其他方法
本文有部分方法并未解析,因为比较简单,胖友可以自己研究。
Semaphore :
#acquireUninterruptibly()#tryAcquire()#tryAcquire(long timeout, TimeUnit unit)#acquire(int permits)#acquireUninterruptibly(int permits)#tryAcquire(int permits)#tryAcquire(int permits, long timeout, TimeUnit unit)#availablePermits()#drainPermits()#reducePermits(int reduction)#isFair()#hasQueuedThreads()#getQueueLength()#getQueuedThreads()
Sync :
#reducePermits(int reductions)#drainPermits()
3. 应用示例
我们已停车为示例:
public class SemaphoreTest {
static class Parking {
//信号量
private Semaphore semaphore;
Parking(int count) {
semaphore = new Semaphore(count);
}
public void park() {
try {
//获取信号量
semaphore.acquire();
long time = (long) (Math.random() * 10);
System.out.println(Thread.currentThread().getName() + "进入停车场,停车" + time + "秒..." );
Thread.sleep(time);
System.out.println(Thread.currentThread().getName() + "开出停车场...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
static class Car extends Thread {
Parking parking ;
Car(Parking parking){
this.parking = parking;
}
@Override
public void run() {
parking.park(); //进入停车场
}
}
public static void main(String[] args){
Parking parking = new Parking(3);
for(int i = 0 ; i < 5 ; i++){
new Car(parking).start();
}
}
}
|
运行结果如下: