线程之间的协作
前面我们可以使用线程来同时运行多个任务,通过锁(互斥)来同步两个任务的行为,从而使得一个任务不会干涉另一个的任务的资源。现在研究如何使任务之间通过协作,以使得多个任务可以一起工作去解决某个问题。我们可以在互斥的基础上使用挂起操作,直到某些外部条件反生变化时,让这个任务继续往前执行。下面具体讨论该方法的实现。
1.挂起操作
1.1 wait()操作
我们可以等待某个条件发生变化之间将线程挂起,当条件满足之后通过notify()或notifyAll()将挂起的线程唤醒,告知其可以继续运行了。注意调用wait()、notify()、notifyAll()时,需要首先获得某个对象的锁,否则会出现以下错误信息。
当线程wait()时会释放掉对象锁(sychronized),这是与sleep()和yield()不同的地方。在进行条件判断时,应当首先获取对象锁,简单的说就是将条件判断放置着sychronized代码块内。
1 package com.dy.xidian; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.TimeUnit; 6 7 class Car { 8 private boolean stop = false; 9 public synchronized void stop() { 10 stop = true; 11 notifyAll(); 12 } 13 14 public synchronized void drive() { 15 stop = false; 16 notifyAll(); 17 } 18 19 public synchronized void waitForStopFinish() throws InterruptedException { 20 while (stop == false) 21 wait(); 22 } 23 24 public synchronized void waitForDriveFinish() throws InterruptedException { 25 while (stop == true) 26 wait(); 27 } 28 } 29 30 class Stop implements Runnable { 31 private Car car; 32 public Stop(Car c) { 33 car = c; 34 } 35 36 @Override 37 public void run() { 38 try { 39 while (!Thread.interrupted()) { 40 System.out.println("car stop!"); 41 TimeUnit.MILLISECONDS.sleep(500); 42 car.stop(); 43 car.waitForDriveFinish(); 44 } 45 } catch (InterruptedException e) { 46 System.out.println("Exiting via interrupt"); 47 } 48 System.out.println("Ending stop task"); 49 50 } 51 } 52 53 class Drive implements Runnable { 54 private Car car; 55 public Drive(Car c) { 56 car = c; 57 } 58 59 @Override 60 public void run() { 61 try { 62 while (!Thread.interrupted()) { 63 car.waitForStopFinish(); 64 System.out.println("car drive!"); 65 TimeUnit.MILLISECONDS.sleep(200); 66 car.drive(); 67 } 68 } catch (InterruptedException e) { 69 System.out.println("Exiting via interrupt"); 70 } 71 System.out.println("End drive task"); 72 } 73 } 74 75 public class Test { 76 public static void main(String[] args) throws InterruptedException { 77 Car car = new Car(); 78 ExecutorService exec = Executors.newCachedThreadPool(); 79 exec.execute(new Stop(car)); 80 exec.execute(new Drive(car)); 81 TimeUnit.SECONDS.sleep(2); 82 exec.shutdownNow(); 83 } 84 }
该程序模拟的汽车行驶的过程,汽车会走一会,停一会,直到一个中断产生。while()循环的使用,如果一个线程正在处于等待状态,当被唤醒时需要重新检测当前的条件是否满足继续等待,因为有可能这个唤醒操作并不是要唤醒这个线程,或者说唤醒之后条件又发生了改变,需要继续wait()。
1.2 await()操作
JAVA中也提供了Condition类,在Condition上调用await()来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者通过signalAll()来唤醒所有在Condition上挂起的任务。现在使用Condition来重写上面的程序。
1 package com.dy.xidian; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.locks.Condition; 7 import java.util.concurrent.locks.Lock; 8 import java.util.concurrent.locks.ReentrantLock; 9 10 class Car { 11 private Lock lock = new ReentrantLock(); 12 Condition condition = lock.newCondition(); 13 private boolean stop = false; 14 15 public void stop() { 16 lock.lock(); 17 stop = true; 18 condition.signalAll(); 19 lock.unlock(); 20 } 21 22 public void drive() { 23 lock.lock(); 24 stop = false; 25 condition.signalAll(); 26 lock.unlock(); 27 } 28 29 public void waitForStopFinish() throws InterruptedException { 30 lock.lock(); 31 while (stop == false) 32 condition.await(); 33 lock.unlock(); 34 } 35 36 public void waitForDriveFinish() throws InterruptedException { 37 lock.lock(); 38 while (stop == true) 39 condition.await(); 40 lock.unlock(); 41 } 42 } 43 44 class Stop implements Runnable { 45 private Car car; 46 public Stop(Car c) { 47 car = c; 48 } 49 50 @Override 51 public void run() { 52 try { 53 while (!Thread.interrupted()) { 54 System.out.println("car stop!"); 55 TimeUnit.MILLISECONDS.sleep(500); 56 car.stop(); 57 car.waitForDriveFinish(); 58 } 59 } catch (InterruptedException e) { 60 System.out.println("Exiting via interrupt"); 61 } 62 System.out.println("Ending stop task"); 63 64 } 65 } 66 67 class Drive implements Runnable { 68 private Car car; 69 public Drive(Car c) { 70 car = c; 71 } 72 73 @Override 74 public void run() { 75 try { 76 while (!Thread.interrupted()) { 77 car.waitForStopFinish(); 78 System.out.println("car drive!"); 79 TimeUnit.MILLISECONDS.sleep(200); 80 car.drive(); 81 } 82 } catch (InterruptedException e) { 83 System.out.println("Exiting via interrupt"); 84 } 85 System.out.println("End drive task"); 86 } 87 } 88 89 public class Test { 90 public static void main(String[] args) throws InterruptedException { 91 Car car = new Car(); 92 ExecutorService exec = Executors.newCachedThreadPool(); 93 exec.execute(new Stop(car)); 94 exec.execute(new Drive(car)); 95 TimeUnit.SECONDS.sleep(2); 96 exec.shutdownNow(); 97 } 98 }