线程通讯
线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就成为整体的必用方式之一。
当线程存在通信交互,系统之间的交互性会更加强大,在提高CPU利用率的同时,开发人员对线程任务在处理的过程中可以进行有效的把控和交互。
注:进入阻塞状态的线程,在解除阻塞后,进入就绪状态,是需要重新抢夺锁和CPU资源的。
|
名称 |
锁 |
CPU执行权 |
描述 |
案例 |
|
wait |
释放 |
释放 |
wait进入睡眠等待状态,需要其他线程用notify/notifyAll来唤醒。 |
生产一个,消费一个。 |
|
notify/notifyAll |
持有 |
释放 |
唤醒调用wait方法进入睡眠的线程 |
消费完了,告诉商家要生产了。 |
|
join |
持有 |
释放 |
A线程执行过程中加入B线程,执行完B线程后继续执行A线程。 |
工作到一半,忽然给叫去开会。 |
|
yield |
持有 |
释放 |
暂停当前线程,让同优先级或高优先级的线程获得CPU执行权 |
龟兔赛跑,兔子休息。 |
|
sleep |
持有 |
释放 |
等待指定时间后,自己醒来。 |
模拟开会时间过程。 |
|
/** * 没有线程通讯的实现问题 */ public class J10ThreadMessage01Problem { public static void main(String[] args) { ThreadMessageProblem threadMessageProblem = new ThreadMessageProblem();
//线程1:添加数据 new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 10; i++) { threadMessageProblem.add(i); System.out.println(Thread.currentThread().getName() + ":添加了一个元素"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } },"t1").start();
//线程2:在数据量达到目标值时,停止 new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(); if(threadMessageProblem.size() == 5) { System.out.println(Thread.currentThread().getName() + "收到停止通知...停止线程。"); throw new RuntimeException(); } } } },"t2").start();
//即便t2收到通知,t2也的确抛出了停止通知,但t1还是在继续运行 } } class ThreadMessageProblem {
private List list = new ArrayList<Integer>();
public void add(Integer i) { list.add(i); }
public Integer size() { return list.size(); } } |
|
/** * 线程通讯问题的解决 * Java提供了wait/notify/notifyAll关键字 */ public class J10ThreadMessage02Solve { public static void main(String[] args) { Solve solve = new Solve();
//1、wait/notify/notifyAll要配合synchronized使用 //wait是释放锁的,notify是不释放锁的 Object object = new Object();
//线程1:添加数据 Thread t1 = new Thread(new Runnable() { @Override public void run() { synchronized (object) { for (int i = 0; i < 10; i++) { solve.add(i); System.out.println(Thread.currentThread().getName() + ":添加了一个元素"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
//唤醒线程2,停止线程 if(solve.size() == 5) { System.out.println("线程1 唤醒 线程2"); object.notify(); } } } } },"t1");
//线程2:在数据量达到目标值时,停止 Thread t2 = new Thread(new Runnable() { @Override public void run() { synchronized (object) { //未达到目标值,等待达到目标值 if(solve.size() != 5) { try { System.out.println("t2 ======="); object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
System.out.println(Thread.currentThread().getName() + "收到停止通知...停止线程。"); throw new RuntimeException(); } } },"t2");
//结论1:wait是释放锁,notify是不释放锁的,在这种特性下,会造成不实时的问题,这个问题将在下一个代码中解决 t2.start(); t1.start();
//结论2:如果先执行t1会出现,t1会一直持有锁,直到运行完成,那么t2永远都不会执行 // t1.start(); // t2.start();
} } class Solve {
private List list = new ArrayList<Integer>();
public void add(Integer i) { list.add(i); }
public Integer size() { return list.size(); } } |
|
/** * 线程通讯问题的解决 * Java提供了wait/notify/notifyAll关键字 * 上一次试验中,wait/notify会由于锁的原因带来延迟的问题,现在就要解决这个延迟的问题 */ public class J10ThreadMessage03Solve2 { public static void main(String[] args) { Solve2 solve = new Solve2();
//1、wait/notify/notifyAll要配合synchronized使用 //wait是释放锁的,notify是不释放锁的 // Object object = new Object();
CountDownLatch countDownLatch = new CountDownLatch(1);
//线程1:添加数据 Thread t1 = new Thread(new Runnable() { @Override public void run() { // synchronized (object) { for (int i = 0; i < 10; i++) { solve.add(i); System.out.println(Thread.currentThread().getName() + ":添加了一个元素"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
//唤醒线程2,停止线程 if(solve.size() == 5) { System.out.println("线程1 唤醒 线程2"); // object.notify(); countDownLatch.countDown(); } // } } } },"t1");
//线程2:在数据量达到目标值时,停止 Thread t2 = new Thread(new Runnable() { @Override public void run() { // synchronized (object) { //未达到目标值,等待达到目标值 if(solve.size() != 5) { try { System.out.println("t2 ======="); // object.wait(); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // }
System.out.println(Thread.currentThread().getName() + "收到停止通知...停止线程。"); throw new RuntimeException(); } } },"t2");
//结论:会发现已经是实时了,顺序调换也没有问题 t1.start(); t2.start(); } } class Solve2 {
private List list = new ArrayList<Integer>();
public void add(Integer i) { list.add(i); }
public Integer size() { return list.size(); } } |
|
/** * wait、notify/notifyAll * 线程通讯:生产者和消费者,生产者生产一个,消费者消费一个, * 生产者生产了,消费者不消费,生产者则一直等待,避免库存积压 * wait释放CPU资源,释放锁 * notify/notifyAll唤醒wait线程,进入CPU执行权争夺和锁争夺 */ public class J10ThreadMessagePC { public static void main(String[] args) { Product product = new Product(); //产品 Producer producer = new Producer(product); //生产者 Consumer consumer = new Consumer(product); //消费者
new Thread(producer,"生产者01").start(); new Thread(producer,"生产者02").start(); new Thread(consumer,"消费者01").start(); new Thread(consumer,"消费者02").start(); } } /** * 产品 */ class Product {
/** * 是否有产品 */ private boolean isHasProduct = false;
/** * 生产产品,未消费则不生产,已消费则生产 * @throws InterruptedException */ public synchronized void production() throws InterruptedException { //wait()、notify()方法都是要在同步代码块中使用,否则会报错IllegalMonitorStateException // public void production() throws InterruptedException { //有产品则等待消费 while (isHasProduct) { //等待消费,一直阻塞 wait(); } Thread.sleep(3000); System.out.println("生产一台笔记本。"); isHasProduct = true; //生产了,那么则让客户去消费 notifyAll(); }
/** * 消费产品,有产品则消费,无产品则等待 * @throws InterruptedException */ public synchronized void consume() throws InterruptedException { //wait()、notify()方法都是要在同步代码块中使用,否则会报错IllegalMonitorStateException // public void consume() throws InterruptedException { while (!isHasProduct) { //等待消费,一直阻塞 wait(); } Thread.sleep(3000); System.out.println("消费一台笔记本。"); isHasProduct = false; //消费了,那么则让工厂去生产 notifyAll(); } } /** * 生产者 */ class Producer implements Runnable {
/** * 产品 */ private Product product;
public Producer(Product product) { this.product = product; }
@Override public void run() { try { product.production(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 消费者 */ class Consumer implements Runnable {
/** * 产品 */ private Product product;
public Consumer(Product product) { this.product = product; }
@Override public void run() { try { product.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
|
/** * join:停止一个正在运行中的线程插入另外一个线程去运行,会释放CPU执行权,但不会释放锁。 * sleep:进入模拟开会时间过程。 */ public class J11JoinThread { public static void main(String[] args) { new Programmer().start(); System.out.println("主线程运行完成"); } } /** * 程序员 */ class Programmer extends Thread {
@Override public void run() { coding(); }
/** * 写代码 */ public void coding() { System.out.println("开始写代码..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("到点开会了...");
try { //开启这个线程,把这个开启的线程插入到当前正在运行的线程中 Manager manager = new Manager(); manager.start(); manager.join(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("开完会,继续写代码...");
} } /** * 经理 */ class Manager extends Thread {
@Override public void run() { System.out.println("召集人员开会..."); System.out.println("讲解项目计划..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("结束,解散会议..."); } } |
|
/** * yield:暂停当前线程执行,执行其他线程,只有优先级与当前线程相同,或者优先级比当前线程更高的处于就绪状态的线程才获得执行机会。 * 释放CPU执行权,持有锁 */ public class J12YieldThread { public static void main(String[] args) { new RabbitRun().start(); new TortoiseRun().start(); System.out.println("开跑..."); } } /** * 兔子 */ class RabbitRun extends Thread { @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("兔子跑了 " + i + " 米"); if(i == 50) { System.out.println("领先那么多,有点累,屈服温柔乡下。"); yield(); } } System.out.println("兔子到达终点。"); } } /** * 乌龟 */ class TortoiseRun extends Thread { @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("乌龟跑了 " + i + " 米"); } System.out.println("乌龟到达终点。"); } } |