简介

    java.util.concurrent包是Java 5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。与synchronized关键字和Object.notify()等方法相比,这些类和方法的抽象层次都较高。Effective Java中提到,其中比较重要的同步和通信机制有Executor框架、Concurrent Collections和Synchronizers三种。  

    其中Synchronizers包含了五种: Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 JCIP中提到,Exchanger可以看做一种特殊的Barrier。Effective Java 提到用的比较多的主要是Semaphore信号量和CounDownLatch倒计时锁存器。本文主要讲解我认为比较重要的Semaphore信号量、CounDownLatch计数锁存器和CyclibBarrier。每一种都按照它们的概念、jdk实现、所提供的方法和使用(traveler或者jdk, or sample code)来进行介绍。


1 Semaphore

semaphore,信号量,是众多synchronizer中的一个。在操作系统中就存在互斥量和信号量这样的概念。 semaphore跟锁机制存在一定的相似性,semaphore也是一种锁机制,所不同的是,reentrantLock是只允许一个线程获得锁,而信号量持有多个许可(permits),允许多个线程获得许可并执行。从这个意义上看,重入锁是许可只有1的信号量。它们所提供的方法也非常接近。

 

1.1 实现

跟ReentrantLock一样,Semaphore也是以AQS为基础来实现的。

1.1.1 构造函数:

非公平版本:

1     public Semaphore(int permits) {
2         sync = new NonfairSync(permits);
3     }

 

可以选择是否公平的版本:

1     public Semaphore(int permits, boolean fair) {
2         sync = fair ? new FairSync(permits) : new NonfairSync(permits);
3     }

 

1.1.2 其他方法

跟ReentrantLock不同的是,每种acquire方法都分为有参数的和不带参数的两个版本:
acquire() :

1     public void acquire() throws InterruptedException {
2         sync.acquireSharedInterruptibly(1);
3     }

 

acquire(int permits)

1     public void acquire(int permits) throws InterruptedException {
2         if (permits < 0) throw new IllegalArgumentException();
3         sync.acquireSharedInterruptibly(permits);
4     }

 

与此类似的还有:

acquireUninterruptibly()&acquireUninterruptibly(int)

tryAcquire()& tryAcquire(int) 

tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)

release()& release(int)

java并发之(4):Semaphore信号量、CounDownLatch计数锁存器和CyclicBarrier循环栅栏

1.2 使用示例:

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 import java.util.concurrent.Semaphore;
 4 
 5 public class TIJ_semaphore {
 6     public static void main(String[] args) {
 7         ExecutorService exec = Executors.newCachedThreadPool();
 8         final Semaphore semp = new Semaphore(5); // 5 permits
 9 
10         for (int index = 0; index < 20; index++) {
11             final int NO = index;
12             Runnable run = new Runnable() {
13                 public void run() {
14                     try {
// if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable
15 semp.acquire();
16 System.out.println("Accessing: " + NO); 17 Thread.sleep((long) (10000); 18 semp.release(); 19 } catch (InterruptedException e) { 20 } 21 } 22 }; 23 exec.execute(run); 24 } 25 exec.shutdown(); 26 }

 程序输出结果为:

 1 Accessing: 0
 2 Accessing: 2
 3 Accessing: 3
 4 Accessing: 4
 5 Accessing: 1
 6 (等待10s)
 7 Accessing: 5
 8 Accessing: 6
 9 Accessing: 14
10 Accessing: 8
11 Accessing: 7
12 (等待10s)
13 Accessing: 10
14 Accessing: 9
15 Accessing: 11
16 Accessing: 15
17 Accessing: 12
18 (等待10s)
19 Accessing: 13
20 Accessing: 16
21 Accessing: 17
22 Accessing: 19
23 Accessing: 18
View Code

相关文章:

  • 2020-01-19
  • 2021-08-01
  • 2021-07-31
  • 2021-09-15
  • 2022-01-01
  • 2021-07-16
猜你喜欢
  • 2021-05-30
  • 2022-02-11
  • 2022-12-23
  • 2021-10-08
  • 2021-10-09
  • 2021-08-06
  • 2021-06-22
相关资源
相似解决方案