背景:生产者消费者的问题真的是绕不开,面试时候很可能让手写此代码,需要深入总结下。
实质上,很多后台服务程序并发控制的基本原理都可以归纳为生产者/消费者模式,而这是恰恰是在本科操作系统课堂上老师反复讲解,而我们却视而不见不以为然的。在博文《一种面向作业流(工作流)的轻量级可复用的异步流水开发框架的设计与实现》中将介绍一种生产者/消费者模式的具体应用。
生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。
解决生产者/消费者问题的方法可分为两类:
(1)采用某种机制保护生产者和消费者之间的同步;
(2)在生产者和消费者之间建立一个管道。
第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
因此本文只介绍同步机制实现的生产者/消费者问题。
同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。
在Java中一共有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
(1)wait() / notify()方法
(2)await() / signal()方法
(3)BlockingQueue阻塞队列方法
(4)PipedInputStream / PipedOutputStream
本文只介绍最常用的前三种,第四种暂不做讨论,有兴趣的读者可以自己去网上找答案。
wait()和notify()方法的实现
wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。
它们都属于 Object 的一部分,而不属于 Thread。
只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。
使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。
1 /** 2 * Project Name:basic 3 * File Name:ProducerAndConsumerWaitNotifyAll.java 4 * Package Name:com.forwork.com.basic.thread0411 5 * Date:2019年4月11日上午6:45:33 6 * Copyright (c) 2019, 深圳金融电子结算中心 All Rights Reserved. 7 * 8 */ 9 10 package com.forwork.com.basic.thread0411; 11 12 /** 13 * ClassName:ProducerAndConsumerWaitNotifyAll <br/> 14 * Function: TODO <br/> 15 * Date: 2019年4月11日 上午6:45:33 <br/> 16 * @author Administrator 17 * @version 1.0 18 * @since JDK 1.7 19 * @see 20 */ 21 public class ProducerAndConsumerWaitNotifyAll { 22 23 private static int count = 0; 24 private static int FULL = 3; //等待条件 25 private static int EMPTY = 0; 26 private static String LOCK = "lock"; 27 28 private static class Producer implements Runnable { 29 public void run() { 30 for (int i = 0; i < 3; i++) { 31 try { 32 Thread.sleep(1000); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 synchronized (LOCK) { 37 if (count == FULL) { 38 System.out.println(Thread.currentThread().getName() + "producelock:" + count); 39 try { 40 LOCK.wait(); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 } 45 count++; 46 System.out.println(Thread.currentThread().getName() + "produce:" + count); 47 LOCK.notifyAll(); 48 49 } 50 } 51 } 52 } 53 54 private static class Consumer implements Runnable { 55 public void run() { 56 for (int i = 0; i < 3; i++) { 57 try { 58 Thread.sleep(1000); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 63 synchronized (LOCK) { 64 if (count == EMPTY) { 65 try { 66 System.out.println(Thread.currentThread().getName() + "consumerlock:" + count); 67 LOCK.wait(); 68 } catch (Exception e) { 69 e.printStackTrace(); 70 } 71 }// (count == EMPTY) 72 count--; 73 System.out.println(Thread.currentThread().getName() + "consumer:" + count); 74 LOCK.notifyAll(); 75 } 76 } 77 } 78 } 79 80 public static void main(String[] args) { 81 for (int i = 0; i < 5; i++) { 82 Producer producer = new Producer(); 83 new Thread(producer).start(); 84 } 85 86 for (int i = 0; i < 5; i++) { 87 Consumer consumer = new Consumer(); 88 new Thread(consumer).start(); 89 } 90 } 91 92 }
结果:
1 Thread-1produce:1 2 Thread-6consumer:0 3 Thread-5consumerlock:0 4 Thread-8consumerlock:0 5 Thread-9consumerlock:0 6 Thread-7consumerlock:0 7 Thread-4produce:1 8 Thread-0produce:2 9 Thread-3produce:3 10 Thread-2producelock:3 11 Thread-7consumer:2 12 Thread-9consumer:1 13 Thread-8consumer:0 14 Thread-5consumer:-1 15 Thread-2produce:0 16 Thread-1produce:1 17 Thread-6consumer:0 18 Thread-0produce:1 19 Thread-3produce:2 20 Thread-4produce:3 21 Thread-9consumer:2 22 Thread-7consumer:1 23 Thread-2produce:2 24 Thread-8consumer:1 25 Thread-5consumer:0 26 Thread-1produce:1 27 Thread-6consumer:0 28 Thread-0produce:1 29 Thread-4produce:2 30 Thread-3produce:3 31 Thread-2producelock:3 32 Thread-9consumer:2 33 Thread-8consumer:1 34 Thread-7consumer:0 35 Thread-5consumerlock:0 36 Thread-2produce:1 37 Thread-5consumer:0