一、前言

  在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。所以很有必要好好分析。

二、AbstractQueuedSynchronizer数据结构

  分析类,首先就要分析底层采用了何种数据结构,抓住核心点进行分析,经过分析可知,AbstractQueuedSynchronizer类的数据结构如下

  【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer

  说明:AbstractQueuedSynchronizer类底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

三、AbstractQueuedSynchronizer源码分析

3.1 类的继承关系

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

 

说明:从类继承关系可知,AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。而AbstractOwnableSynchronizer抽象类的源码如下

 1 public abstract class AbstractOwnableSynchronizer
 2     implements java.io.Serializable {
 3     
 4     // 版本序列号
 5     private static final long serialVersionUID = 3737899427754241961L;
 6     // 构造函数
 7     protected AbstractOwnableSynchronizer() { }
 8     // 独占模式下的线程
 9     private transient Thread exclusiveOwnerThread;
10     
11     // 设置独占线程 
12     protected final void setExclusiveOwnerThread(Thread thread) {
13         exclusiveOwnerThread = thread;
14     }
15     
16     // 获取独占线程 
17     protected final Thread getExclusiveOwnerThread() {
18         return exclusiveOwnerThread;
19     }
20 }

说明:AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。

3.2 类的内部类

AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍。

1. Node类

 1 static final class Node {
 2         // 模式,分为共享与独占
 3         // 共享模式
 4         static final Node SHARED = new Node();
 5         // 独占模式
 6         static final Node EXCLUSIVE = null;        
 7         // 结点状态
 8         // CANCELLED,值为1,表示当前的线程被取消
 9         // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
10         // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
11         // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
12         // 值为0,表示当前节点在sync队列中,等待着获取锁
13         static final int CANCELLED =  1;
14         static final int SIGNAL    = -1;
15         static final int CONDITION = -2;
16         static final int PROPAGATE = -3;        
17 
18         // 结点状态
19         volatile int waitStatus;        
20         // 前驱结点
21         volatile Node prev;    
22         // 后继结点
23         volatile Node next;        
24         // 结点所对应的线程
25         volatile Thread thread;        
26         // 下一个等待者
27         Node nextWaiter;
28         
29         // 结点是否在共享模式下等待
30         final boolean isShared() {
31             return nextWaiter == SHARED;
32         }
33         
34         // 获取前驱结点,若前驱结点为空,抛出异常
35         final Node predecessor() throws NullPointerException {
36             // 保存前驱结点
37             Node p = prev; 
38             if (p == null) // 前驱结点为空,抛出异常
39                 throw new NullPointerException();
40             else // 前驱结点不为空,返回
41                 return p;
42         }
43         
44         // 无参构造函数
45         Node() {    // Used to establish initial head or SHARED marker
46         }
47         
48         // 构造函数
49          Node(Thread thread, Node mode) {    // Used by addWaiter
50             this.nextWaiter = mode;
51             this.thread = thread;
52         }
53         
54         // 构造函数
55         Node(Thread thread, int waitStatus) { // Used by Condition
56             this.waitStatus = waitStatus;
57             this.thread = thread;
58         }
59 }

说明:每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

  ① CANCELLED,值为1,表示当前的线程被取消。

  ② SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。

  ③ CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。

  ④ PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。

  ⑤ 值为0,表示当前节点在sync queue中,等待着获取锁。

2. ConditionObject类

  1 // 内部类
  2     public class ConditionObject implements Condition, java.io.Serializable {
  3         // 版本号
  4         private static final long serialVersionUID = 1173984872572414699L;
  5         /** First node of condition queue. */
  6         // condition队列的头结点
  7         private transient Node firstWaiter;
  8         /** Last node of condition queue. */
  9         // condition队列的尾结点
 10         private transient Node lastWaiter;
 11 
 12         /**
 13          * Creates a new {@code ConditionObject} instance.
 14          */
 15         // 构造函数
 16         public ConditionObject() { }
 17 
 18         // Internal methods
 19 
 20         /**
 21          * Adds a new waiter to wait queue.
 22          * @return its new wait node
 23          */
 24         // 添加新的waiter到wait队列
 25         private Node addConditionWaiter() {
 26             // 保存尾结点
 27             Node t = lastWaiter;
 28             // If lastWaiter is cancelled, clean out.
 29             if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION
 30                 // 清除状态为CONDITION的结点
 31                 unlinkCancelledWaiters(); 
 32                 // 将最后一个结点重新赋值给t
 33                 t = lastWaiter;
 34             }
 35             // 新建一个结点
 36             Node node = new Node(Thread.currentThread(), Node.CONDITION);
 37             if (t == null) // 尾结点为空
 38                 // 设置condition队列的头结点
 39                 firstWaiter = node;
 40             else // 尾结点不为空
 41                 // 设置为节点的nextWaiter域为node结点
 42                 t.nextWaiter = node;
 43             // 更新condition队列的尾结点
 44             lastWaiter = node;
 45             return node;
 46         }
 47 
 48         /**
 49          * Removes and transfers nodes until hit non-cancelled one or
 50          * null. Split out from signal in part to encourage compilers
 51          * to inline the case of no waiters.
 52          * @param first (non-null) the first node on condition queue
 53          */
 54         private void doSignal(Node first) {
 55             // 循环
 56             do {
 57                 if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空
 58                     // 设置尾结点为空
 59                     lastWaiter = null;
 60                 // 设置first结点的nextWaiter域
 61                 first.nextWaiter = null;
 62             } while (!transferForSignal(first) &&
 63                      (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
 64         }
 65 
 66         /**
 67          * Removes and transfers all nodes.
 68          * @param first (non-null) the first node on condition queue
 69          */
 70         private void doSignalAll(Node first) {
 71             // condition队列的头结点尾结点都设置为空
 72             lastWaiter = firstWaiter = null;
 73             // 循环
 74             do {
 75                 // 获取first结点的nextWaiter域结点
 76                 Node next = first.nextWaiter;
 77                 // 设置first结点的nextWaiter域为空
 78                 first.nextWaiter = null;
 79                 // 将first结点从condition队列转移到sync队列
 80                 transferForSignal(first);
 81                 // 重新设置first
 82                 first = next;
 83             } while (first != null);
 84         }
 85 
 86         /**
 87          * Unlinks cancelled waiter nodes from condition queue.
 88          * Called only while holding lock. This is called when
 89          * cancellation occurred during condition wait, and upon
 90          * insertion of a new waiter when lastWaiter is seen to have
 91          * been cancelled. This method is needed to avoid garbage
 92          * retention in the absence of signals. So even though it may
 93          * require a full traversal, it comes into play only when
 94          * timeouts or cancellations occur in the absence of
 95          * signals. It traverses all nodes rather than stopping at a
 96          * particular target to unlink all pointers to garbage nodes
 97          * without requiring many re-traversals during cancellation
 98          * storms.
 99          */
100         // 从condition队列中清除状态为CANCEL的结点
101         private void unlinkCancelledWaiters() {
102             // 保存condition队列头结点
103             Node t = firstWaiter;
104             Node trail = null;
105             while (t != null) { // t不为空
106                 // 下一个结点
107                 Node next = t.nextWaiter;
108                 if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态
109                     // 设置t节点的额nextWaiter域为空
110                     t.nextWaiter = null;
111                     if (trail == null) // trail为空
112                         // 重新设置condition队列的头结点
113                         firstWaiter = next;
114                     else // trail不为空
115                         // 设置trail结点的nextWaiter域为next结点
116                         trail.nextWaiter = next;
117                     if (next == null) // next结点为空
118                         // 设置condition队列的尾结点
119                         lastWaiter = trail;
120                 }
121                 else // t结点的状态为CONDTION状态
122                     // 设置trail结点
123                     trail = t;
124                 // 设置t结点
125                 t = next;
126             }
127         }
128 
129         // public methods
130 
131         /**
132          * Moves the longest-waiting thread, if one exists, from the
133          * wait queue for this condition to the wait queue for the
134          * owning lock.
135          *
136          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
137          *         returns {@code false}
138          */
139         // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
140         public final void signal() {
141             if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
142                 throw new IllegalMonitorStateException();
143             // 保存condition队列头结点
144             Node first = firstWaiter;
145             if (first != null) // 头结点不为空
146                 // 唤醒一个等待线程
147                 doSignal(first);
148         }
149 
150         /**
151          * Moves all threads from the wait queue for this condition to
152          * the wait queue for the owning lock.
153          *
154          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
155          *         returns {@code false}
156          */
157         // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
158         public final void signalAll() {
159             if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
160                 throw new IllegalMonitorStateException();
161             // 保存condition队列头结点
162             Node first = firstWaiter;
163             if (first != null) // 头结点不为空
164                 // 唤醒所有等待线程
165                 doSignalAll(first);
166         }
167 
168         /**
169          * Implements uninterruptible condition wait.
170          * <ol>
171          * <li> Save lock state returned by {@link #getState}.
172          * <li> Invoke {@link #release} with saved state as argument,
173          *      throwing IllegalMonitorStateException if it fails.
174          * <li> Block until signalled.
175          * <li> Reacquire by invoking specialized version of
176          *      {@link #acquire} with saved state as argument.
177          * </ol>
178          */
179         // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
180         public final void awaitUninterruptibly() {
181             // 添加一个结点到等待队列
182             Node node = addConditionWaiter();
183             // 获取释放的状态
184             int savedState = fullyRelease(node);
185             boolean interrupted = false;
186             while (!isOnSyncQueue(node)) { // 
187                 // 阻塞当前线程
188                 LockSupport.park(this);
189                 if (Thread.interrupted()) // 当前线程被中断
190                     // 设置interrupted状态
191                     interrupted = true; 
192             }
193             if (acquireQueued(node, savedState) || interrupted) // 
194                 selfInterrupt();
195         }
196 
197         /*
198          * For interruptible waits, we need to track whether to throw
199          * InterruptedException, if interrupted while blocked on
200          * condition, versus reinterrupt current thread, if
201          * interrupted while blocked waiting to re-acquire.
202          */
203 
204         /** Mode meaning to reinterrupt on exit from wait */
205         private static final int REINTERRUPT =  1;
206         /** Mode meaning to throw InterruptedException on exit from wait */
207         private static final int THROW_IE    = -1;
208 
209         /**
210          * Checks for interrupt, returning THROW_IE if interrupted
211          * before signalled, REINTERRUPT if after signalled, or
212          * 0 if not interrupted.
213          */
214         private int checkInterruptWhileWaiting(Node node) {
215             return Thread.interrupted() ?
216                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
217                 0; 
218         }
219 
220         /**
221          * Throws InterruptedException, reinterrupts current thread, or
222          * does nothing, depending on mode.
223          */
224         private void reportInterruptAfterWait(int interruptMode)
225             throws InterruptedException {
226             if (interruptMode == THROW_IE)
227                 throw new InterruptedException();
228             else if (interruptMode == REINTERRUPT)
229                 selfInterrupt();
230         }
231 
232         /**
233          * Implements interruptible condition wait.
234          * <ol>
235          * <li> If current thread is interrupted, throw InterruptedException.
236          * <li> Save lock state returned by {@link #getState}.
237          * <li> Invoke {@link #release} with saved state as argument,
238          *      throwing IllegalMonitorStateException if it fails.
239          * <li> Block until signalled or interrupted.
240          * <li> Reacquire by invoking specialized version of
241          *      {@link #acquire} with saved state as argument.
242          * <li> If interrupted while blocked in step 4, throw InterruptedException.
243          * </ol>
244          */
245         // // 等待,当前线程在接到信号或被中断之前一直处于等待状态
246         public final void await() throws InterruptedException {
247             if (Thread.interrupted()) // 当前线程被中断,抛出异常
248                 throw new InterruptedException();
249             // 在wait队列上添加一个结点
250             Node node = addConditionWaiter();
251             // 
252             int savedState = fullyRelease(node);
253             int interruptMode = 0;
254             while (!isOnSyncQueue(node)) {
255                 // 阻塞当前线程
256                 LockSupport.park(this);
257                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
258                     break;
259             }
260             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
261                 interruptMode = REINTERRUPT;
262             if (node.nextWaiter != null) // clean up if cancelled
263                 unlinkCancelledWaiters();
264             if (interruptMode != 0)
265                 reportInterruptAfterWait(interruptMode);
266         }
267 
268         /**
269          * Implements timed condition wait.
270          * <ol>
271          * <li> If current thread is interrupted, throw InterruptedException.
272          * <li> Save lock state returned by {@link #getState}.
273          * <li> Invoke {@link #release} with saved state as argument,
274          *      throwing IllegalMonitorStateException if it fails.
275          * <li> Block until signalled, interrupted, or timed out.
276          * <li> Reacquire by invoking specialized version of
277          *      {@link #acquire} with saved state as argument.
278          * <li> If interrupted while blocked in step 4, throw InterruptedException.
279          * </ol>
280          */
281         // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 
282         public final long awaitNanos(long nanosTimeout)
283                 throws InterruptedException {
284             if (Thread.interrupted())
285                 throw new InterruptedException();
286             Node node = addConditionWaiter();
287             int savedState = fullyRelease(node);
288             final long deadline = System.nanoTime() + nanosTimeout;
289             int interruptMode = 0;
290             while (!isOnSyncQueue(node)) {
291                 if (nanosTimeout <= 0L) {
292                     transferAfterCancelledWait(node);
293                     break;
294                 }
295                 if (nanosTimeout >= spinForTimeoutThreshold)
296                     LockSupport.parkNanos(this, nanosTimeout);
297                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
298                     break;
299                 nanosTimeout = deadline - System.nanoTime();
300             }
301             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
302                 interruptMode = REINTERRUPT;
303             if (node.nextWaiter != null)
304                 unlinkCancelledWaiters();
305             if (interruptMode != 0)
306                 reportInterruptAfterWait(interruptMode);
307             return deadline - System.nanoTime();
308         }
309 
310         /**
311          * Implements absolute timed condition wait.
312          * <ol>
313          * <li> If current thread is interrupted, throw InterruptedException.
314          * <li> Save lock state returned by {@link #getState}.
315          * <li> Invoke {@link #release} with saved state as argument,
316          *      throwing IllegalMonitorStateException if it fails.
317          * <li> Block until signalled, interrupted, or timed out.
318          * <li> Reacquire by invoking specialized version of
319          *      {@link #acquire} with saved state as argument.
320          * <li> If interrupted while blocked in step 4, throw InterruptedException.
321          * <li> If timed out while blocked in step 4, return false, else true.
322          * </ol>
323          */
324         // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
325         public final boolean awaitUntil(Date deadline)
326                 throws InterruptedException {
327             long abstime = deadline.getTime();
328             if (Thread.interrupted())
329                 throw new InterruptedException();
330             Node node = addConditionWaiter();
331             int savedState = fullyRelease(node);
332             boolean timedout = false;
333             int interruptMode = 0;
334             while (!isOnSyncQueue(node)) {
335                 if (System.currentTimeMillis() > abstime) {
336                     timedout = transferAfterCancelledWait(node);
337                     break;
338                 }
339                 LockSupport.parkUntil(this, abstime);
340                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
341                     break;
342             }
343             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
344                 interruptMode = REINTERRUPT;
345             if (node.nextWaiter != null)
346                 unlinkCancelledWaiters();
347             if (interruptMode != 0)
348                 reportInterruptAfterWait(interruptMode);
349             return !timedout;
350         }
351 
352         /**
353          * Implements timed condition wait.
354          * <ol>
355          * <li> If current thread is interrupted, throw InterruptedException.
356          * <li> Save lock state returned by {@link #getState}.
357          * <li> Invoke {@link #release} with saved state as argument,
358          *      throwing IllegalMonitorStateException if it fails.
359          * <li> Block until signalled, interrupted, or timed out.
360          * <li> Reacquire by invoking specialized version of
361          *      {@link #acquire} with saved state as argument.
362          * <li> If interrupted while blocked in step 4, throw InterruptedException.
363          * <li> If timed out while blocked in step 4, return false, else true.
364          * </ol>
365          */
366         // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于:awaitNanos(unit.toNanos(time)) > 0
367         public final boolean await(long time, TimeUnit unit)
368                 throws InterruptedException {
369             long nanosTimeout = unit.toNanos(time);
370             if (Thread.interrupted())
371                 throw new InterruptedException();
372             Node node = addConditionWaiter();
373             int savedState = fullyRelease(node);
374             final long deadline = System.nanoTime() + nanosTimeout;
375             boolean timedout = false;
376             int interruptMode = 0;
377             while (!isOnSyncQueue(node)) {
378                 if (nanosTimeout <= 0L) {
379                     timedout = transferAfterCancelledWait(node);
380                     break;
381                 }
382                 if (nanosTimeout >= spinForTimeoutThreshold)
383                     LockSupport.parkNanos(this, nanosTimeout);
384                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
385                     break;
386                 nanosTimeout = deadline - System.nanoTime();
387             }
388             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
389                 interruptMode = REINTERRUPT;
390             if (node.nextWaiter != null)
391                 unlinkCancelledWaiters();
392             if (interruptMode != 0)
393                 reportInterruptAfterWait(interruptMode);
394             return !timedout;
395         }
396 
397         //  support for instrumentation
398 
399         /**
400          * Returns true if this condition was created by the given
401          * synchronization object.
402          *
403          * @return {@code true} if owned
404          */
405         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
406             return sync == AbstractQueuedSynchronizer.this;
407         }
408 
409         /**
410          * Queries whether any threads are waiting on this condition.
411          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
412          *
413          * @return {@code true} if there are any waiting threads
414          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
415          *         returns {@code false}
416          */
417         //  查询是否有正在等待此条件的任何线程
418         protected final boolean hasWaiters() {
419             if (!isHeldExclusively())
420                 throw new IllegalMonitorStateException();
421             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
422                 if (w.waitStatus == Node.CONDITION)
423                     return true;
424             }
425             return false;
426         }
427 
428         /**
429          * Returns an estimate of the number of threads waiting on
430          * this condition.
431          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
432          *
433          * @return the estimated number of waiting threads
434          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
435          *         returns {@code false}
436          */
437         // 返回正在等待此条件的线程数估计值
438         protected final int getWaitQueueLength() {
439             if (!isHeldExclusively())
440                 throw new IllegalMonitorStateException();
441             int n = 0;
442             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
443                 if (w.waitStatus == Node.CONDITION)
444                     ++n;
445             }
446             return n;
447         }
448 
449         /**
450          * Returns a collection containing those threads that may be
451          * waiting on this Condition.
452          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
453          *
454          * @return the collection of threads
455          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
456          *         returns {@code false}
457          */
458         // 返回包含那些可能正在等待此条件的线程集合
459         protected final Collection<Thread> getWaitingThreads() {
460             if (!isHeldExclusively())
461                 throw new IllegalMonitorStateException();
462             ArrayList<Thread> list = new ArrayList<Thread>();
463             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
464                 if (w.waitStatus == Node.CONDITION) {
465                     Thread t = w.thread;
466                     if (t != null)
467                         list.add(t);
468                 }
469             }
470             return list;
471         }
472 }
View Code

相关文章:

  • 2021-08-06
  • 2021-10-31
  • 2021-08-07
  • 2021-09-19
  • 2021-09-27
  • 2022-03-07
  • 2021-11-18
  • 2022-01-27
猜你喜欢
  • 2021-09-20
  • 2021-04-14
  • 2018-05-07
  • 2022-02-20
  • 2021-06-23
  • 2021-11-27
  • 2022-12-23
相关资源
相似解决方案