1.阻塞队列
阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素
阻塞队列提供了四种处理方法:
| 方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 检查方法 | element() | peek() | 不可用 | 不可用 |
异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
-
返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
-
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
-
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
成员方法
| 队列 | 有界性 | 锁 | 数据结构 |
|---|---|---|---|
| ArrayBlockingQueue | bounded(有界) | 加锁 | arrayList |
| LinkedBlockingQueue | optionally-bounded | 加锁 | linkedList |
| PriorityBlockingQueue | unbounded | 加锁 | heap |
| DelayQueue | unbounded | 加锁 | heap |
| SynchronousQueue | bounded | 加锁 | 无 |
| LinkedTransferQueue | unbounded | 加锁 | heap |
| LinkedBlockingDeque | unbounded | 无锁 | heap |
下面分别简单介绍一下:
-
ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】
-
LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
-
PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
-
DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
-
SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
-
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
-
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
ArrayBlockingQueue
package demo.queue;
import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueue1 {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//给定初始容量
// add/remove 抛出异常
arrayBlockingQueue.add("a");
arrayBlockingQueue.add("b");
arrayBlockingQueue.add("c");
//容量为3 当我们加入第四个值时则会抛出异常
arrayBlockingQueue.add("d");
}
}
输出:
package demo.queue;
import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueue1 {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//给定初始容量
// add/remove 抛出异常
arrayBlockingQueue.add("a");
arrayBlockingQueue.add("b");
arrayBlockingQueue.add("c");
//容量为3 当我们加入第四个值时则会抛出异常
//arrayBlockingQueue.add("d");
//remove
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
//当队列没有元素时抛出异常
System.out.println(arrayBlockingQueue.remove());
}
}
输出:
add方法和offer方法最终调用的是enqueue(E x)方法 看源码可知
SynchronousQueue
SynchronousQueue , 只有一个容量!
每一个put操作,就需要有一个 take操作!
package demo.queue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueue2 {
public static void main(String[] args) {
SynchronousQueue<String> arrayBlockingQueue = new SynchronousQueue<>();
// A 存
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "put a");
arrayBlockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + "put b");
arrayBlockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + "put c");
arrayBlockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A:").start();
// B 取
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B:").start();
}
}
输出: