原文连接:SynchronousQueue原理解析

源码解析

1、SynchronousQueue.java

  1 public class SynchronousQueue<E> extends AbstractQueue<E>
  2     implements BlockingQueue<E>, java.io.Serializable {
  3     
  4     //Transferer是一个抽象类,SynchronousQueue内部有2个Transferer的子类,分别是TransferQueue和TransferStack
  5     //
  6     private transient volatile Transferer<E> transferer;
  7 
  8     //默认构造方法的线程等待队列是不保证顺序的
  9     public SynchronousQueue() {
 10         this(false);
 11     }
 12 
 13     //如果fair为true,那SynchronousQueue所采用的是能保证先进先出的TransferQueue,也就是先被挂起的线程会先返回
 14     public SynchronousQueue(boolean fair) {
 15         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
 16     }
 17     
 18     //向SynchronousQueue中添加数据,如果此时线程队列中没有获取数据的线程的话,当前的线程就会挂起等待
 19     public void put(E e) throws InterruptedException {
 20         //添加的数据不能是null
 21         if (e == null) throw new NullPointerException();
 22         //可以看到添加的方法调用的是transfer方法,如果添加失败会抛出InterruptedException异常
 23         //后面我们可以在transfer方法的源码中调用put方法添加数据在当前线程被中断时才会返回null
 24         //这里相当于继续把线程中断的InterruptedException向上抛出
 25         if (transferer.transfer(e, false, 0) == null) {
 26             Thread.interrupted();
 27             throw new InterruptedException();
 28         }
 29     }
 30     
 31     //不带超时时间的offer方法,如果此时没有线程正在等待获取数据的话transfer就会返回null,也就是添加数据失败
 32     public boolean offer(E e) {
 33         if (e == null) throw new NullPointerException();
 34         return transferer.transfer(e, true, 0) != null;
 35     }
 36     
 37     //带超时时间的offer方法,与上面的不同的是这个方法会等待一个超时时间,如果时间过了还没有线程来获取数据就会返回失败
 38     public boolean offer(E e, long timeout, TimeUnit unit)
 39         throws InterruptedException {
 40         if (e == null) throw new NullPointerException();
 41         //添加的数据被其他线程成功获取,返回成功
 42         if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
 43             return true;
 44         //如果添加数据失败了,有可能是线程被中断了,不是的话直接返回false
 45         if (!Thread.interrupted())
 46             return false;
 47         //是线程被中断的话就向上跑出InterruptedException异常
 48         throw new InterruptedException();
 49     }
 50     
 51     //take方法用于从队列中取数据,如果此时没有添加数据的线程被挂起,那当前线程就会被挂起等待
 52     public E take() throws InterruptedException {
 53         E e = transferer.transfer(null, false, 0);
 54         //成功获取数据
 55         if (e != null)
 56             return e;
 57         //没有获取到数据,同时又退出挂起状态了,那说明线程被中断了,向上抛出InterruptedException
 58         Thread.interrupted();
 59         throw new InterruptedException();
 60     }
 61     
 62     //poll方法同样用于获取数据
 63     public E poll() {
 64         return transferer.transfer(null, true, 0);
 65     }
 66     
 67     //带超时时间的poll方法,如果超时时间到了还没有线程插入数据,就会返回失败
 68     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 69         E e = transferer.transfer(null, true, unit.toNanos(timeout));
 70         //返回结果有2种情况
 71         //e != null表示成功取到数据了
 72         //!Thread.interrupted()表示返回失败了,且是因为超时失败的,此时e是null
 73         if (e != null || !Thread.interrupted())
 74             return e;
 75         //返回失败了,并且是因为当前线程被中断了
 76         throw new InterruptedException();
 77     }
 78     
 79     //可以看到SynchronousQueue的isEmpty方法一直返回的是true,因为SynchronousQueue没有任何容量
 80     public boolean isEmpty() {
 81         return true;
 82     }
 83 
 84     //同样的size方法也返回0
 85     public int size() {
 86         return 0;
 87     }
 88     
 89     <!--下面我们看看TransferQueue的具体实现,TransferQueue中的关键方法就是transfer方法了-->
 90     
 91     //先看看TransferQueue的父类Transferer,比较简单,就是提供了一个transfer方法,需要子类具体实现
 92     abstract static class Transferer<E> {
 93         abstract E transfer(E e, boolean timed, long nanos);
 94     }
 95     
 96     //TransferQueue
 97     static final class TransferQueue<E> extends Transferer<E> {
 98     
 99         //内部的节点类,用于表示一个请求
100         //这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出
101         static final class QNode {
102             volatile QNode next;          // next node in queue
103             volatile Object item;         // CAS'ed to or from null
104             //请求所在的线程
105             volatile Thread waiter;       // to control park/unpark
106             //用于判断是入队还是出队,true表示的是入队操作,也就是添加数据
107             final boolean isData;
108 
109             QNode(Object item, boolean isData) {
110                 this.item = item;
111                 this.isData = isData;
112             }
113 
114             //可以看到QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全
115             //compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量
116             //第三个参数表示参数期待的值,第四个参数表示更新后的值
117             //下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功
118             boolean casNext(QNode cmp, QNode val) {
119                 return next == cmp &&
120                     U.compareAndSwapObject(this, NEXT, cmp, val);
121             }
122 
123             //方法的原理同上面的类似,这里就是更新item的值了
124             boolean casItem(Object cmp, Object val) {
125                 return item == cmp &&
126                     U.compareAndSwapObject(this, ITEM, cmp, val);
127             }
128 
129             //方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了
130             void tryCancel(Object cmp) {
131                 U.compareAndSwapObject(this, ITEM, cmp, this);
132             }
133 
134             //调用tryCancel方法后item就会是this,就表示当前任务被取消了
135             boolean isCancelled() {
136                 return item == this;
137             }
138 
139             //表示当前任务已经被返回了
140             boolean isOffList() {
141                 return next == this;
142             }
143 
144             // Unsafe mechanics
145             private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
146             private static final long ITEM;
147             private static final long NEXT;
148 
149             static {
150                 try {
151                     ITEM = U.objectFieldOffset
152                         (QNode.class.getDeclaredField("item"));
153                     NEXT = U.objectFieldOffset
154                         (QNode.class.getDeclaredField("next"));
155                 } catch (ReflectiveOperationException e) {
156                     throw new Error(e);
157                 }
158             }
159         }
160         
161         //首节点
162         transient volatile QNode head;
163         //尾部节点
164         transient volatile QNode tail;
165         /**
166          * Reference to a cancelled node that might not yet have been
167          * unlinked from queue because it was the last inserted node
168          * when it was cancelled.
169          */
170         transient volatile QNode cleanMe;
171 
172         //构造函数中会初始化一个出队的节点,并且首尾都指向这个节点
173         TransferQueue() {
174             QNode h = new QNode(null, false); // initialize to dummy node.
175             head = h;
176             tail = h;
177         }
178         
179         //transfer方法用于提交数据或者是获取数据
180         @SuppressWarnings("unchecked")
181         E transfer(E e, boolean timed, long nanos) {
182             QNode s = null; // constructed/reused as needed
183             //如果e不为null,就说明是添加数据的入队操作
184             boolean isData = (e != null);
185 
186             for (;;) {
187                 QNode t = tail;
188                 QNode h = head;
189                 if (t == null || h == null)         // saw uninitialized value
190                     continue;                       // spin
191 
192                 //当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲
193                 if (h == t || t.isData == isData) { // empty or same-mode
194                     QNode tn = t.next;
195                     //这是一个检查,确保t指向队尾
196                     if (t != tail)                  // inconsistent read
197                         continue;
198                     //tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环
199                     if (tn != null) {               // lagging tail
200                         advanceTail(t, tn);
201                         continue;
202                     }
203                     //如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null
204                     //这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null
205                     if (timed && nanos <= 0L)       // can't wait
206                         return null;
207                     //如果没有超时时间或者超时时间不为0的话就创建新的节点
208                     if (s == null)
209                         s = new QNode(e, isData);
210                     //使tail的next指向新的节点
211                     if (!t.casNext(null, s))        // failed to link in
212                         continue;
213                     //更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点
214                     advanceTail(t, s);              // swing tail and wait
215                     //如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交
216                     Object x = awaitFulfill(s, e, timed, nanos);
217                     //当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
218                     if (x == s) {                   // wait was cancelled
219                         //将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了
220                         clean(t, s);
221                         return null;
222                     }
223                     
224                     //如果s还没有被
225                     if (!s.isOffList()) {           // not already unlinked
226                         advanceHead(t, s);          // unlink if head
227                         if (x != null)              // and forget fields
228                             s.item = s;
229                         s.waiter = null;
230                     }
231                     return (x != null) ? (E)x : e;
232 
233                 } 
234                 //提交操作的时候刚刚好有反向的操作在等待
235                 else {                            // complementary-mode
236                     QNode m = h.next;               // node to fulfill
237                     if (t != tail || m == null || h != head)
238                         continue;                   // inconsistent read
239 
240                     Object x = m.item;
241                     //这里先判断m是否是有效的操作
242                     if (isData == (x != null) ||    // m already fulfilled
243                         x == m ||                   // m cancelled
244                         !m.casItem(x, e)) {         // lost CAS
245                         advanceHead(h, m);          // dequeue and retry
246                         continue;
247                     }
248                     
249                     //更新头部节点
250                     advanceHead(h, m);              // successfully fulfilled
251                     //唤醒m节点的被挂起的线程
252                     LockSupport.unpark(m.waiter);
253                     //返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功
254                     return (x != null) ? (E)x : e;
255                 }
256             }
257         }
258         
259         <!--下面看看执行挂起线程的方法awaitFulfill-->
260         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
261             /* Same idea as TransferStack.awaitFulfill */
262             //首先获取超时时间
263             final long deadline = timed ? System.nanoTime() + nanos : 0L;
264             //当前操作所在的线程
265             Thread w = Thread.currentThread();
266             //线程被挂起或是进入超时等待之前阻止自旋的次数
267             int spins = (head.next == s)
268                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
269                 : 0;
270             for (;;) {
271                 //这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记
272                 if (w.isInterrupted())
273                     s.tryCancel(e);
274                 Object x = s.item;
275                 //x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
276                 if (x != e)
277                     return x;
278                 //这里先判断超时的时间是否过了
279                 if (timed) {
280                     nanos = deadline - System.nanoTime();
281                     if (nanos <= 0L) {
282                         s.tryCancel(e);
283                         continue;
284                     }
285                 }
286                 //这里通过多几次循环来避免直接挂起线程
287                 if (spins > 0)
288                     --spins;
289                 else if (s.waiter == null)
290                     s.waiter = w;
291                 else if (!timed)
292                     //park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒
293                     LockSupport.park(this);
294                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
295                     //parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
296                     LockSupport.parkNanos(this, nanos);
297             }
298         }
299         
300     }
301 }
View Code

相关文章: