1. 简介

CyclicBarrier ,一个同步辅助类,在 AP I中是这么介绍的:

它允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 Barrier 在释放等待线程后可以重用,所以称它为循环( Cyclic ) 的 屏障( Barrier ) 。

通俗点讲就是:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

2. 实现分析

java.util.concurrent.CyclicBarrier 的结构如下:

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

通过上图,我们可以看到 CyclicBarrier 的内部是使用重入锁 ReentrantLock 和 Condition 。

它有两个构造函数:

  • CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

  • CyclicBarrier(int parties, Runnable barrierAction) :创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

  • 代码如下:

     

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    

     

    • parties 变量,表示拦截线程的数量。
    • count 变量,表示拦截线程的剩余需要数量。
    • barrierAction 变量,为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行 barrierAction ,用于处理更加复杂的业务场景。
    • generation 变量,表示 CyclicBarrier 的更新换代。详细解析,见 「2.4 Generation」 。

2.1 await

在 CyclicBarrier 中最重要的方法莫过于 #await() 方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。代码如下:

或者说,每个线程调用 #await() 方法,告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。当所有线程都到达了屏障,结束阻塞,所有线程可继续执行后续逻辑。

 

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);//不超时等待
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

 

  • 内部调用 #dowait(boolean timed, long nanos) 方法,执行阻塞等待( timed=true )。详细解析,见 「2.3 dowait」 。

  • 理论来说,不会出现 TimeoutException 异常,所以在发生时,直接抛出 Error 错误。

  • 返回值为进入屏障时,剩余拦截线程的剩余需要数量。英文注释如下:

    the arrival index of the current thread, where index {@code getParties() - 1} indicates the first to arrive and zero indicates the last to arrive

2.2 await

#await(long timeout, TimeUnit unit) 方法,在 #await() 的基础上,增加了等待超时的特性。代码如下:

 

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

 

  • 内部调用 #dowait(boolean timed, long nanos) 方法,执行阻塞等待( timed=true )。详细解析,见 「2.3 dowait」 。

2.3 dowait

#dowait(boolean timed, long nanos) 方法,代码如下:

 

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    //获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //分代
        final Generation g = generation;

        //当前generation“已损坏”,抛出BrokenBarrierException异常
        //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie
        if (g.broken)
            //当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
            throw new BrokenBarrierException();

        //如果线程中断,终止CyclicBarrier
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        //进来一个线程 count - 1
        int index = --count;
        //count == 0 表示所有线程均已到位,触发Runnable任务
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                //触发任务
                if (command != null)
                    command.run();
                ranAction = true;
                //唤醒所有等待线程,并更新generation
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction) // 未执行,说明 barrierCommand 执行报错,或者线程打断等等情况。
                    breakBarrier();
            }
        }

        for (;;) {
            try {
                //如果不是超时等待,则调用Condition.await()方法等待
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    //超时等待,调用Condition.awaitNanos()方法等待
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            //generation已经更新,返回index
            if (g != generation)
                return index;

            //“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        //释放锁
        lock.unlock();
    }
}

 

如果该线程不是到达的最后一个线程,则他会一直处于等待状态,除非发生以下情况:

  1. 最后一个线程到达,即 index == 0 。
  2. 超出了指定时间(超时等待)。
  3. 其他的某个线程中断当前线程。
  4. 其他的某个线程中断另一个等待的线程。
  5. 其他的某个线程在等待 barrier 超时。
  6. 其他的某个线程在此 barrier 调用 #reset() 方法。#reset() 方法,用于将屏障重置为初始状态。

在 #dowait(boolean timed, long nanos) 方法的源代码中,我们总是可以看到抛出 BrokenBarrierException 异常,那么什么时候抛出异常呢?例如:

  • 如果一个线程处于等待状态时,如果其他线程调用 #reset() 方法。详细解析,见 「2.7 reset」 。
  • 调用的 barrier 原本就是被损坏的,则抛出 BrokenBarrierException 异常。
  • 任何线程在等待时被中断了,则其他所有线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。详细解析,见 「2.6 breakBarrier」 。

2.4 Generation

Generation 是 CyclicBarrier 内部静态类,描述了 CyclicBarrier 的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有 parties 个线程全部到达 barrier 时,generation 就会被更新换代。其中 broken 属性,标识该当前 CyclicBarrier 是否已经处于中断状态。代码如下:

 

private static class Generation {
    boolean broken = false;
}

 

  • 默认 barrier 是没有损坏的。

2.5 breakBarrier

当 barrier 损坏了,或者有一个线程中断了,则通过 #breakBarrier() 方法,来终止所有的线程。代码如下:

 

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

 

  • 在 breakBarrier() 方法中,中除了将 broken设置为 true ,还会调用 #signalAll() 方法,将在 CyclicBarrier 处于等待状态的线程全部唤醒。

2.6 nextGeneration

当所有线程都已经到达 barrier 处(index == 0),则会通过 nextGeneration() 方法,进行更新换代操作。在这个步骤中,做了三件事:

  1. 唤醒所有线程。
  2. 重置 count 。
  3. 重置 generation 。

代码如下:

 

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

 

2.7 reset

#reset() 方法,重置 barrier 到初始化状态。代码如下:

 

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

 

  • 通过组合 #breakBarrier() 和 #nextGeneration() 方法来实现。

2.8 getNumberWaiting

#getNumberWaiting() 方法,获得等待的线程数。代码如下:

 

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

 

2.9 isBroken

#isBroken() 方法,判断 CyclicBarrier 是否处于中断。代码如下:

 

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

 

3. 应用场景

CyclicBarrier 适用于多线程结果合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。比如,我们需要统计多个 Excel 中的数据,然后等到一个总结果。我们可以通过多线程处理每一个 Excel ,执行完成后得到相应的结果,最后通过 barrierAction 来计算这些线程的计算结果,得到所有Excel的总和。

4. 应用示例

比如我们开会只有等所有的人到齐了才会开会,如下:

 

public class CyclicBarrierTest {

    private static CyclicBarrier cyclicBarrier;

    static class CyclicBarrierThread extends Thread{
        public void run() {
            System.out.println(Thread.currentThread().getName() + "到了");
            //等待
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("人到齐了,开会吧....");
            }
        });

        for(int i = 0 ; i < 5 ; i++){
            new CyclicBarrierThread().start();
        }
    }
    
}

 

运行结果:

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

 

1. 简介

在上篇博客中,我们介绍了 Java 四大并发工具之一的 CyclicBarrier ,今天要介绍的CountDownLatch 与 CyclicBarrier 有点儿相似

CyclicBarrier 所描述的是“允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务",而 CountDownLatch 所描述的是“在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待”。在API中是这样描述的:

用给定的计数初始化 CountDownLatch。由于调用了 #countDown() 方法,所以在当前计数到达零之前,#await() 方法会一直受阻塞。之后,会释放所有等待的线程,#await() 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier 。

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候,需要带入该计数器值,该值就表示了线程的数量。

  • 每当一个线程完成自己的任务后,计数器的值就会减 1 。
  • 当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。

虽然,CountDownLatch 与 CyclicBarrier 有那么点相似,但是他们还是存在一些区别的:

  1. CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行;而 CyclicBarrier 则是允许 N 个线程相互等待。
  2. CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它被称为是循环的 barrier 。

2. 实现分析

java.util.concurrent.CountDownLatch 结构如下图:

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

通过上面的结构图我们可以看到,CountDownLatch 内部依赖 Sync 实现,而 Sync 继承 AQS 。

CountDownLatch 仅提供了一个构造方法,代码如下:

 

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

 

  • 构造一个用给定计数初始化的 CountDownLatch 。

2.1 Sync

sync 变量,为 CountDownLatch 的一个内部类 Sync ,其定义如下:

 

 private static final class Sync extends AbstractQueuedSynchronizer {
        
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        // 获取同步状态
        int getCount() {
            return getState();
        }

        // 获取同步状态
        @Override
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // 释放同步状态
        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
}

 

  • 通过这个内部类 Sync 实现类,我们可以清楚地看到, CountDownLatch 是采用共享锁来实现的。
  • #tryAcquireShared(int acquires) 和 #tryReleaseShared(int releases) 方法,结合下文一起理解。

2.2 await

CountDownLatch 提供 #await() 方法,来使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断,定义如下:

 

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

 

  • 该方法内部使用 AQS 的 #acquireSharedInterruptibly(int arg) 方法,代码如下:

     

    // AQS.java
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    

     

    • 在内部类 Sync 中重写了 #tryAcquireShared(int arg)方法,代码如下:

       

      // Sync.java
      @Override
      protected int tryAcquireShared(int acquires) {
          return (getState() == 0) ? 1 : -1;
      }
      

       

      • getState() 方法,获取同步状态,其值等于计数器的值。从这里我们可以看到,如果计数器值不等于 0,则会调用 #doAcquireSharedInterruptibly(int arg) 方法。该方法为一个自旋方法会尝试一直去获取同步状态,代码如下:

         

        // AQS.java
        private void doAcquireSharedInterruptibly(int arg)
                throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        /**
                         * 对于CountDownLatch而言,如果计数器值不等于0,那么r 会一直小于0
                         */
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //等待
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        

         

        • x

2.3 await

CountDownLatch 提供 #await(long timeout, TimeUnit unit) 方法,来使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断,或者等待超时,定义如下:

 

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

 

  • 调用 AQS 的 tryAcquireSharedNanos(int acquires, long nanosTimeout) 方法,逻辑和 「2.2 await」 类似。

2.4 countDown

CountDownLatch 提供 #countDown() 方法,递减锁存器的计数。如果计数到达零,则唤醒所有等待的线程。

 

public void countDown() {
    sync.releaseShared(1);
}

 

  • 内部调用 AQS 的 #releaseShared(int arg) 方法,来释放共享锁同步状态:

     

    // AQS.java
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

     

    • #tryReleaseShared(int arg) 方法,被 CountDownLatch 的内部类 Sync 重写,代码如下:

       

      // Sync.java
      @Overrride
      protected boolean tryReleaseShared(int releases) {
          for (;;) {
              //获取锁状态
              int c = getState();
              //c == 0 直接返回,释放锁成功
              if (c == 0)
                  return false;
              //计算新“锁计数器”
              int nextc = c-1;
              //更新锁状态(计数器)
              if (compareAndSetState(c, nextc))
                  return nextc == 0;
          }
      }
      

       

      • x

2.5 getCount

 

public long getCount() {
    return sync.getCount();
}

 

3. 总结

CountDownLatch 内部通过共享锁实现

  • 在创建 CountDownLatch 实例时,需要传递一个int型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。
  • 当某个线程调用 #await() 方法,程序首先判断 count 的值是否为 0 ,如果不为 0 的话,则会一直等待直到为 0 为止。
  • 当其他线程调用 #countDown() 方法时,则执行释放共享锁状态,使 count 值 - 1。
  • 当在创建 CountDownLatch 时初始化的 count 参数,必须要有 count 线程调用#countDown() 方法,才会使计数器 count 等于 0 ,锁才会释放,前面等待的线程才会继续运行。
  • 注意 CountDownLatch 不能回滚重置

4. 应用示例

示例仍然使用开会案例。老板进入会议室等待 5 个人全部到达会议室才会开会。所以这里有两种线程:老板等待开会线程、员工到达会议室线程:

 

public class CountDownLatchTest {

    private static CountDownLatch countDownLatch = new CountDownLatch(5);

    /**
     * Boss线程,等待员工到达开会
     */
    static class BossThread extends Thread{
        @Override
        public void run() {
            System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
            try {
                //Boss等待
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("所有人都已经到齐了,开会吧...");
        }
    }

    // 员工到达会议室线程
    static class EmpleoyeeThread  extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ",到达会议室....");
            //员工到达会议室 count - 1
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args){
        //Boss线程启动
        new BossThread().start();

        for(int i = 0 ; i < countDownLatch.getCount() ; i++){
            new EmpleoyeeThread().start();
        }
    }
}

 

运行结果:

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

 

 

 

1. 简介

信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 一样,其本质上是一个“共享锁”。

Semaphore,在 API 是这么介绍的:

一个计数信号量。从概念上讲,信号量维护了一个许可集。

  • 如有必要,在许可可用前会阻塞每一个 acquire,然后再获取该许可。
  • 每个 release 添加一个许可,从而可能释放一个正在阻塞的获取者。

但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

下面我们就一个停车场的简单例子来阐述 Semaphore :

  • 为了简单起见我们假设停车场仅有 5 个停车位。一开始停车场没有车辆所有车位全部空着,然后先后到来三辆车,停车场车位够,安排进去停车,然后又来三辆,这个时候由于只有两个停车位,所有只能停两辆,其余一辆必须在外面候着,直到停车场有空车位。当然,以后每来一辆都需要在外面候着。当停车场有车开出去,里面有空位了,则安排一辆车进去(至于是哪辆,要看选择的机制是公平还是非公平)。
  • 从程序角度看,停车场就相当于信号量 Semaphore ,其中许可数为 5 ,车辆就相对线程。当来一辆车时,许可数就会减 1 。当停车场没有车位了(许可数 == 0 ),其他来的车辆需要在外面等候着。如果有一辆车开出停车场,许可数 + 1,然后放进来一辆车。
  • 信号量 Semaphore 是一个非负整数( >=1 )。当一个线程想要访问某个共享资源时,它必须要先获取 Semaphore。当 Semaphore > 0 时,获取该资源并使 Semaphore – 1 。如果S emaphore 值 = 0,则表示全部的共享资源已经被其他线程全部占用,线程必须要等待其他线程释放资源。当线程释放资源时,Semaphore 则 +1 。

2. 实现分析

java.util.concurrent.Semaphore 结构如下图:

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

从上图可以看出,Semaphore 内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类 Sync ,其中 Sync 继承 AQS(再一次阐述 AQS 的重要性)。

Semaphore 提供了两个构造函数:

  1. Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore 。
  2. Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的 Semaphore 。

实现如下:

 

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

 

  • Semaphore 默认选择非公平锁
  • 当信号量 Semaphore = 1 时,它可以当作互斥锁使用。其中 0、1 就相当于它的状态:1)当 =1 时表示,其他线程可以获取;2)当 =0 时,排他,即其他线程必须要等待。
  • ???? Semaphore 的代码实现结构,和 ReentrantLock 类似。

2.1 信号量获取

Semaphore 提供了 #acquire() 方法,来获取一个许可。

 

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

 

  • 内部调用 AQS 的 #acquireSharedInterruptibly(int arg) 方法,该方法以共享模式获取同步状态。代码如下:

     

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    

     

    • 在 #acquireSharedInterruptibly(int arg) 方法中,会调用 #tryAcquireShared(int arg) 方法。而 #tryAcquireShared(int arg) 方法,由子类来实现。对于 Semaphore 而言,如果我们选择非公平模式,则调用 NonfairSync 的#tryAcquireShared(int arg) 方法,否则调用 FairSync 的 #tryAcquireShared(int arg) 方法。若 #tryAcquireShared(int arg) 方法返回 < 0 时,则会阻塞等待,从而实现 Semaphore 信号量不足时的阻塞,代码如下:

       

      // AQS.java
      private void doAcquireSharedInterruptibly(int arg)
              throws InterruptedException {
          final Node node = addWaiter(Node.SHARED);
          boolean failed = true;
          try {
              for (;;) {
                  final Node p = node.predecessor();
                  if (p == head) {
                      int r = tryAcquireShared(arg);
                      if (r >= 0) {
                          setHeadAndPropagate(node, r);
                          p.next = null; // help GC
                          failed = false;
                          return;
                      }
                  }
                  /**
                   * 对于 Semaphore 而言,如果 tryAcquireShared 返回小于 0 时,则会阻塞等待。
                   */
                  if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                      throw new InterruptedException();
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      

       

      • 老艿艿:另外,这也是为什么 Semaphore 在使用 AQS 时,state 代表的是,剩余可获取的许可数,而不是已经使用的许可数。我们假设 state 代表的是已经使用的许可数,那么 #tryAcquireShared(int arg) 返回的结果 = 原始许可数 - state ,这个操作在并发情况下,会存在线程不安全的问题。所以,state 代表的是,剩余可获取的许可数,而不是已经使用的许可数
    • 公平情况的 FairSync 的方法实现,代码如下:

       

      // FairSync.java
      @Override
      protected int tryAcquireShared(int acquires) {
          for (;;) {
              //判断该线程是否位于CLH队列的列头,从而实现公平锁
              if (hasQueuedPredecessors())
                  return -1;
              //获取当前的信号量许可
              int available = getState();
      
              //设置“获得acquires个信号量许可之后,剩余的信号量许可数”
              int remaining = available - acquires;
      
              //CAS设置信号量
              if (remaining < 0 ||
                      compareAndSetState(available, remaining))
                  return remaining;
          }
      }
      

       

      • 通过 #hasQueuedPredecessors() 方法,判断该线程是否位于 CLH 队列的列头,从而实现公平锁。
    • 非公平情况的 NonfairSync 的方法实现,代码如下:

       

      // NonfairSync.java
      protected int tryAcquireShared(int acquires) {
          return nonfairTryAcquireShared(acquires);
      }
      
      // Sync.java
      final int nonfairTryAcquireShared(int acquires) {
          for (;;) {
              int available = getState();
              int remaining = available - acquires;
              if (remaining < 0 ||
                  compareAndSetState(available, remaining))
                  return remaining;
          }
      }
      

       

      • 对于非公平而言,因为它不需要判断当前线程是否位于 CLH 同步队列列头,所以相对而言会简单些。

2.2 信号量释放

获取了许可,当用完之后就需要释放,Semaphore 提供 #release() 方法,来释放许可。代码如下:

 

public void release() {
    sync.releaseShared(1);
}

 

  • 内部调用 AQS 的 #releaseShared(int arg) 方法,释放同步状态。

     

    // AQS.java
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

     

    • releaseShared(int arg) 方法,会调用 Semaphore 内部类 Sync 的 #tryReleaseShared(int arg) 方法,释放同步状态。

       

      // Sync.java
      protected final boolean tryReleaseShared(int releases) {
          for (;;) {
              int current = getState();
              //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
              int next = current + releases;
              if (next < current) // overflow
                  throw new Error("Maximum permit count exceeded");
              //设置可获取的信号许可数为next
              if (compareAndSetState(current, next))
                  return true;
          }
      }
      

       

2.3 其他方法

本文有部分方法并未解析,因为比较简单,胖友可以自己研究。

Semaphore :

  • #acquireUninterruptibly()
  • #tryAcquire()
  • #tryAcquire(long timeout, TimeUnit unit)
  • #acquire(int permits)
  • #acquireUninterruptibly(int permits)
  • #tryAcquire(int permits)
  • #tryAcquire(int permits, long timeout, TimeUnit unit)
  • #availablePermits()
  • #drainPermits()
  • #reducePermits(int reduction)
  • #isFair()
  • #hasQueuedThreads()
  • #getQueueLength()
  • #getQueuedThreads()

Sync :

  • #reducePermits(int reductions)
  • #drainPermits()

3. 应用示例

我们已停车为示例:

 

public class SemaphoreTest {

    static class Parking {
    
        //信号量
        private Semaphore semaphore;

        Parking(int count) {
            semaphore = new Semaphore(count);
        }

        public void park() {
            try {
                //获取信号量
                semaphore.acquire();
                long time = (long) (Math.random() * 10);
                System.out.println(Thread.currentThread().getName() + "进入停车场,停车" + time + "秒..." );
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName() + "开出停车场...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }


    static class Car extends Thread {
        Parking parking ;

        Car(Parking parking){
            this.parking = parking;
        }

        @Override
        public void run() {
            parking.park();     //进入停车场
        }
    }

    public static void main(String[] args){
        Parking parking = new Parking(3);

        for(int i = 0 ; i < 5 ; i++){
            new Car(parking).start();
        }
    }
}

 

运行结果如下:

Java多线程(九):J.U.C 之CyclicBarrier/CountDownLatch/Semaphore

相关文章: