原文连接:SynchronousQueue原理解析
源码解析
1、SynchronousQueue.java
1 public class SynchronousQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 //Transferer是一个抽象类,SynchronousQueue内部有2个Transferer的子类,分别是TransferQueue和TransferStack 5 // 6 private transient volatile Transferer<E> transferer; 7 8 //默认构造方法的线程等待队列是不保证顺序的 9 public SynchronousQueue() { 10 this(false); 11 } 12 13 //如果fair为true,那SynchronousQueue所采用的是能保证先进先出的TransferQueue,也就是先被挂起的线程会先返回 14 public SynchronousQueue(boolean fair) { 15 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); 16 } 17 18 //向SynchronousQueue中添加数据,如果此时线程队列中没有获取数据的线程的话,当前的线程就会挂起等待 19 public void put(E e) throws InterruptedException { 20 //添加的数据不能是null 21 if (e == null) throw new NullPointerException(); 22 //可以看到添加的方法调用的是transfer方法,如果添加失败会抛出InterruptedException异常 23 //后面我们可以在transfer方法的源码中调用put方法添加数据在当前线程被中断时才会返回null 24 //这里相当于继续把线程中断的InterruptedException向上抛出 25 if (transferer.transfer(e, false, 0) == null) { 26 Thread.interrupted(); 27 throw new InterruptedException(); 28 } 29 } 30 31 //不带超时时间的offer方法,如果此时没有线程正在等待获取数据的话transfer就会返回null,也就是添加数据失败 32 public boolean offer(E e) { 33 if (e == null) throw new NullPointerException(); 34 return transferer.transfer(e, true, 0) != null; 35 } 36 37 //带超时时间的offer方法,与上面的不同的是这个方法会等待一个超时时间,如果时间过了还没有线程来获取数据就会返回失败 38 public boolean offer(E e, long timeout, TimeUnit unit) 39 throws InterruptedException { 40 if (e == null) throw new NullPointerException(); 41 //添加的数据被其他线程成功获取,返回成功 42 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) 43 return true; 44 //如果添加数据失败了,有可能是线程被中断了,不是的话直接返回false 45 if (!Thread.interrupted()) 46 return false; 47 //是线程被中断的话就向上跑出InterruptedException异常 48 throw new InterruptedException(); 49 } 50 51 //take方法用于从队列中取数据,如果此时没有添加数据的线程被挂起,那当前线程就会被挂起等待 52 public E take() throws InterruptedException { 53 E e = transferer.transfer(null, false, 0); 54 //成功获取数据 55 if (e != null) 56 return e; 57 //没有获取到数据,同时又退出挂起状态了,那说明线程被中断了,向上抛出InterruptedException 58 Thread.interrupted(); 59 throw new InterruptedException(); 60 } 61 62 //poll方法同样用于获取数据 63 public E poll() { 64 return transferer.transfer(null, true, 0); 65 } 66 67 //带超时时间的poll方法,如果超时时间到了还没有线程插入数据,就会返回失败 68 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 69 E e = transferer.transfer(null, true, unit.toNanos(timeout)); 70 //返回结果有2种情况 71 //e != null表示成功取到数据了 72 //!Thread.interrupted()表示返回失败了,且是因为超时失败的,此时e是null 73 if (e != null || !Thread.interrupted()) 74 return e; 75 //返回失败了,并且是因为当前线程被中断了 76 throw new InterruptedException(); 77 } 78 79 //可以看到SynchronousQueue的isEmpty方法一直返回的是true,因为SynchronousQueue没有任何容量 80 public boolean isEmpty() { 81 return true; 82 } 83 84 //同样的size方法也返回0 85 public int size() { 86 return 0; 87 } 88 89 <!--下面我们看看TransferQueue的具体实现,TransferQueue中的关键方法就是transfer方法了--> 90 91 //先看看TransferQueue的父类Transferer,比较简单,就是提供了一个transfer方法,需要子类具体实现 92 abstract static class Transferer<E> { 93 abstract E transfer(E e, boolean timed, long nanos); 94 } 95 96 //TransferQueue 97 static final class TransferQueue<E> extends Transferer<E> { 98 99 //内部的节点类,用于表示一个请求 100 //这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出 101 static final class QNode { 102 volatile QNode next; // next node in queue 103 volatile Object item; // CAS'ed to or from null 104 //请求所在的线程 105 volatile Thread waiter; // to control park/unpark 106 //用于判断是入队还是出队,true表示的是入队操作,也就是添加数据 107 final boolean isData; 108 109 QNode(Object item, boolean isData) { 110 this.item = item; 111 this.isData = isData; 112 } 113 114 //可以看到QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全 115 //compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量 116 //第三个参数表示参数期待的值,第四个参数表示更新后的值 117 //下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功 118 boolean casNext(QNode cmp, QNode val) { 119 return next == cmp && 120 U.compareAndSwapObject(this, NEXT, cmp, val); 121 } 122 123 //方法的原理同上面的类似,这里就是更新item的值了 124 boolean casItem(Object cmp, Object val) { 125 return item == cmp && 126 U.compareAndSwapObject(this, ITEM, cmp, val); 127 } 128 129 //方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了 130 void tryCancel(Object cmp) { 131 U.compareAndSwapObject(this, ITEM, cmp, this); 132 } 133 134 //调用tryCancel方法后item就会是this,就表示当前任务被取消了 135 boolean isCancelled() { 136 return item == this; 137 } 138 139 //表示当前任务已经被返回了 140 boolean isOffList() { 141 return next == this; 142 } 143 144 // Unsafe mechanics 145 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 146 private static final long ITEM; 147 private static final long NEXT; 148 149 static { 150 try { 151 ITEM = U.objectFieldOffset 152 (QNode.class.getDeclaredField("item")); 153 NEXT = U.objectFieldOffset 154 (QNode.class.getDeclaredField("next")); 155 } catch (ReflectiveOperationException e) { 156 throw new Error(e); 157 } 158 } 159 } 160 161 //首节点 162 transient volatile QNode head; 163 //尾部节点 164 transient volatile QNode tail; 165 /** 166 * Reference to a cancelled node that might not yet have been 167 * unlinked from queue because it was the last inserted node 168 * when it was cancelled. 169 */ 170 transient volatile QNode cleanMe; 171 172 //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点 173 TransferQueue() { 174 QNode h = new QNode(null, false); // initialize to dummy node. 175 head = h; 176 tail = h; 177 } 178 179 //transfer方法用于提交数据或者是获取数据 180 @SuppressWarnings("unchecked") 181 E transfer(E e, boolean timed, long nanos) { 182 QNode s = null; // constructed/reused as needed 183 //如果e不为null,就说明是添加数据的入队操作 184 boolean isData = (e != null); 185 186 for (;;) { 187 QNode t = tail; 188 QNode h = head; 189 if (t == null || h == null) // saw uninitialized value 190 continue; // spin 191 192 //当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲 193 if (h == t || t.isData == isData) { // empty or same-mode 194 QNode tn = t.next; 195 //这是一个检查,确保t指向队尾 196 if (t != tail) // inconsistent read 197 continue; 198 //tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环 199 if (tn != null) { // lagging tail 200 advanceTail(t, tn); 201 continue; 202 } 203 //如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null 204 //这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null 205 if (timed && nanos <= 0L) // can't wait 206 return null; 207 //如果没有超时时间或者超时时间不为0的话就创建新的节点 208 if (s == null) 209 s = new QNode(e, isData); 210 //使tail的next指向新的节点 211 if (!t.casNext(null, s)) // failed to link in 212 continue; 213 //更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点 214 advanceTail(t, s); // swing tail and wait 215 //如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交 216 Object x = awaitFulfill(s, e, timed, nanos); 217 //当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true 218 if (x == s) { // wait was cancelled 219 //将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了 220 clean(t, s); 221 return null; 222 } 223 224 //如果s还没有被 225 if (!s.isOffList()) { // not already unlinked 226 advanceHead(t, s); // unlink if head 227 if (x != null) // and forget fields 228 s.item = s; 229 s.waiter = null; 230 } 231 return (x != null) ? (E)x : e; 232 233 } 234 //提交操作的时候刚刚好有反向的操作在等待 235 else { // complementary-mode 236 QNode m = h.next; // node to fulfill 237 if (t != tail || m == null || h != head) 238 continue; // inconsistent read 239 240 Object x = m.item; 241 //这里先判断m是否是有效的操作 242 if (isData == (x != null) || // m already fulfilled 243 x == m || // m cancelled 244 !m.casItem(x, e)) { // lost CAS 245 advanceHead(h, m); // dequeue and retry 246 continue; 247 } 248 249 //更新头部节点 250 advanceHead(h, m); // successfully fulfilled 251 //唤醒m节点的被挂起的线程 252 LockSupport.unpark(m.waiter); 253 //返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功 254 return (x != null) ? (E)x : e; 255 } 256 } 257 } 258 259 <!--下面看看执行挂起线程的方法awaitFulfill--> 260 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { 261 /* Same idea as TransferStack.awaitFulfill */ 262 //首先获取超时时间 263 final long deadline = timed ? System.nanoTime() + nanos : 0L; 264 //当前操作所在的线程 265 Thread w = Thread.currentThread(); 266 //线程被挂起或是进入超时等待之前阻止自旋的次数 267 int spins = (head.next == s) 268 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) 269 : 0; 270 for (;;) { 271 //这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记 272 if (w.isInterrupted()) 273 s.tryCancel(e); 274 Object x = s.item; 275 //x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法 276 if (x != e) 277 return x; 278 //这里先判断超时的时间是否过了 279 if (timed) { 280 nanos = deadline - System.nanoTime(); 281 if (nanos <= 0L) { 282 s.tryCancel(e); 283 continue; 284 } 285 } 286 //这里通过多几次循环来避免直接挂起线程 287 if (spins > 0) 288 --spins; 289 else if (s.waiter == null) 290 s.waiter = w; 291 else if (!timed) 292 //park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒 293 LockSupport.park(this); 294 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) 295 //parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒 296 LockSupport.parkNanos(this, nanos); 297 } 298 } 299 300 } 301 }