若有不正之处请多多谅解,并欢迎批评指正。
请尊重作者劳动成果,转载请标明原文链接:
http://www.cnblogs.com/go2sea/p/5615531.html
CyclicBarrier是java.util.concurrent包中提供的同步工具。通过这个工具我们可以实现n个线程相互等待。我们可以通过参数指定达到公共屏障点之后的行为。
先上源码:
1 package java.util.concurrent; 2 import java.util.concurrent.locks.*; 3 4 public class CyclicBarrier { 5 6 private static class Generation { 7 boolean broken = false; 8 } 9 10 private final ReentrantLock lock = new ReentrantLock(); 11 private final Condition trip = lock.newCondition(); 12 private final int parties; 13 private final Runnable barrierCommand; 14 private Generation generation = new Generation(); 15 private int count; 16 17 private void nextGeneration() { 18 // signal completion of last generation 19 trip.signalAll(); 20 // set up next generation 21 count = parties; 22 generation = new Generation(); 23 } 24 25 26 private void breakBarrier() { 27 generation.broken = true; 28 count = parties; 29 trip.signalAll(); 30 } 31 32 private int dowait(boolean timed, long nanos) 33 throws InterruptedException, BrokenBarrierException, TimeoutException { 34 final ReentrantLock lock = this.lock; 35 lock.lock(); 36 try { 37 final Generation g = generation; 38 39 //小概率事件:该线程在等待锁的过程中,barrier被破坏 40 if (g.broken) 41 throw new BrokenBarrierException(); 42 43 //小概率事件:该线程在等待锁的过程中被中断 44 if (Thread.interrupted()) { 45 breakBarrier(); 46 throw new InterruptedException(); 47 } 48 49 int index = --count; 50 //当有parties个线程到达barrier 51 if (index == 0) { // tripped 52 boolean ranAction = false; 53 try { 54 final Runnable command = barrierCommand; 55 //如果设置了barrierCommand,令最后到达的barrier的线程执行它 56 if (command != null) 57 command.run(); 58 ranAction = true; 59 nextGeneration(); 60 return 0; 61 } finally { 62 //注意:当执行barrierCommand出现异常时,ranAction派上用场 63 if (!ranAction) 64 breakBarrier(); 65 } 66 } 67 68 // loop until tripped, broken, interrupted, or timed out 69 for (;;) { 70 try { 71 if (!timed) 72 trip.await(); 73 else if (nanos > 0L) 74 //注意:nanos值标识了是否超时,后续用这个nanos值判断是否breakBarrier 75 nanos = trip.awaitNanos(nanos); 76 } catch (InterruptedException ie) { 77 if (g == generation && ! g.broken) { 78 breakBarrier(); 79 throw ie; 80 } else { 81 //小概率事件:该线程被中断,进入锁等待队列 82 //在等待过程中,另一个线程更新或破坏了generation 83 //当该线程获取锁之后,应重置interrupt标志而不是抛出异常 84 //原因在于:它中断的太晚了,generation已更新或破坏,它抛出InterruptedException的时机已经过去, 85 //两种情况: 86 //①g被破坏。已经有一个线程抛出了InterruptedException(也只能由第一个抛),与它同时等待的都抛BrokenBarrierException(后续检查broken标志会抛)。 87 //②g被更新:此时抛异常没意义(后续检查g更新后会return index),这里重置interrupt标志,让线程继续执行,让这个标志由上层处理 88 Thread.currentThread().interrupt(); 89 } 90 } 91 92 //barrier被破坏,抛出异常 93 if (g.broken) 94 throw new BrokenBarrierException(); 95 96 //barrier正常进入下一循环,上一代await的线程继续执行 97 if (g != generation) 98 return index; 99 100 //只要有一个超时,就breakBarrier,后续线程抛的就是barrier损坏异常 101 if (timed && nanos <= 0L) { 102 breakBarrier(); 103 throw new TimeoutException(); 104 } 105 } 106 } finally { 107 lock.unlock(); 108 } 109 } 110 111 112 public CyclicBarrier(int parties, Runnable barrierAction) { 113 if (parties <= 0) throw new IllegalArgumentException(); 114 this.parties = parties; 115 this.count = parties; 116 this.barrierCommand = barrierAction; 117 } 118 119 public CyclicBarrier(int parties) { 120 this(parties, null); 121 } 122 123 124 public int getParties() { 125 return parties; 126 } 127 128 129 public int await() throws InterruptedException, BrokenBarrierException { 130 try { 131 return dowait(false, 0L); 132 } catch (TimeoutException toe) { 133 throw new Error(toe); // cannot happen; 134 } 135 } 136 137 138 public int await(long timeout, TimeUnit unit) 139 throws InterruptedException, 140 BrokenBarrierException, 141 TimeoutException { 142 return dowait(true, unit.toNanos(timeout)); 143 } 144 145 146 public boolean isBroken() { 147 final ReentrantLock lock = this.lock; 148 lock.lock(); 149 try { 150 return generation.broken; 151 } finally { 152 lock.unlock(); 153 } 154 } 155 156 public void reset() { 157 final ReentrantLock lock = this.lock; 158 lock.lock(); 159 try { 160 breakBarrier(); // break the current generation 161 nextGeneration(); // start a new generation 162 } finally { 163 lock.unlock(); 164 } 165 } 166 167 public int getNumberWaiting() { 168 final ReentrantLock lock = this.lock; 169 lock.lock(); 170 try { 171 return parties - count; 172 } finally { 173 lock.unlock(); 174 } 175 } 176 }