简介
java.util.concurrent包是Java 5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。与synchronized关键字和Object.notify()等方法相比,这些类和方法的抽象层次都较高。Effective Java中提到,其中比较重要的同步和通信机制有Executor框架、Concurrent Collections和Synchronizers三种。
其中Synchronizers包含了五种: Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 JCIP中提到,Exchanger可以看做一种特殊的Barrier。Effective Java 提到用的比较多的主要是Semaphore信号量和CounDownLatch倒计时锁存器。本文主要讲解我认为比较重要的Semaphore信号量、CounDownLatch计数锁存器和CyclibBarrier。每一种都按照它们的概念、jdk实现、所提供的方法和使用(traveler或者jdk, or sample code)来进行介绍。
1 Semaphore
semaphore,信号量,是众多synchronizer中的一个。在操作系统中就存在互斥量和信号量这样的概念。 semaphore跟锁机制存在一定的相似性,semaphore也是一种锁机制,所不同的是,reentrantLock是只允许一个线程获得锁,而信号量持有多个许可(permits),允许多个线程获得许可并执行。从这个意义上看,重入锁是许可只有1的信号量。它们所提供的方法也非常接近。
1.1 实现
跟ReentrantLock一样,Semaphore也是以AQS为基础来实现的。
1.1.1 构造函数:
非公平版本:
1 public Semaphore(int permits) {
2 sync = new NonfairSync(permits);
3 }
可以选择是否公平的版本:
1 public Semaphore(int permits, boolean fair) {
2 sync = fair ? new FairSync(permits) : new NonfairSync(permits);
3 }
1.1.2 其他方法
跟ReentrantLock不同的是,每种acquire方法都分为有参数的和不带参数的两个版本:
acquire() :
1 public void acquire() throws InterruptedException {
2 sync.acquireSharedInterruptibly(1);
3 }
acquire(int permits)
1 public void acquire(int permits) throws InterruptedException {
2 if (permits < 0) throw new IllegalArgumentException();
3 sync.acquireSharedInterruptibly(permits);
4 }
与此类似的还有:
acquireUninterruptibly()&acquireUninterruptibly(int)
tryAcquire()& tryAcquire(int)
tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)
release()& release(int)
1.2 使用示例:
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 import java.util.concurrent.Semaphore; 4 5 public class TIJ_semaphore { 6 public static void main(String[] args) { 7 ExecutorService exec = Executors.newCachedThreadPool(); 8 final Semaphore semp = new Semaphore(5); // 5 permits 9 10 for (int index = 0; index < 20; index++) { 11 final int NO = index; 12 Runnable run = new Runnable() { 13 public void run() { 14 try {
// if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable 15 semp.acquire();
16 System.out.println("Accessing: " + NO); 17 Thread.sleep((long) (10000); 18 semp.release(); 19 } catch (InterruptedException e) { 20 } 21 } 22 }; 23 exec.execute(run); 24 } 25 exec.shutdown(); 26 }
程序输出结果为:
1 Accessing: 0 2 Accessing: 2 3 Accessing: 3 4 Accessing: 4 5 Accessing: 1 6 (等待10s) 7 Accessing: 5 8 Accessing: 6 9 Accessing: 14 10 Accessing: 8 11 Accessing: 7 12 (等待10s) 13 Accessing: 10 14 Accessing: 9 15 Accessing: 11 16 Accessing: 15 17 Accessing: 12 18 (等待10s) 19 Accessing: 13 20 Accessing: 16 21 Accessing: 17 22 Accessing: 19 23 Accessing: 18