前言:AQS框架在J.U.C中的地位不言而喻,可以说没有AQS就没有J.U.C包,可见其重要性,因此有必要对其原理进行详细深入的理解。

1.AQS是什么

在深入AQS之前,首先我们要搞清楚什么是AQS。AQS全称是AbstractQueuedSynchronizer,我们直接查看AQS源码的注释。

AQS框架源码分析-AbstractQueuedSynchronizer

大致意思就是说:AQS提供了实现阻塞锁和相关同步器并依赖先进先出(FIFO)等待队列的框架。

AQS框架源码分析-AbstractQueuedSynchronizer

AQS依赖一个原子数值作为锁的状态,子类可以有多个状态值,只能通过原子方法区操作该值,从而保证同步。

通过第一段的注释大致总结下AQS是什么:

①AQS是一个同步的基础框架,基于一个先进先出的队列

②锁机制依赖一个原子值的状态。

③AQS的子类负责定义与操作这个状态值,但必须通过AQS提供的原子操作。

④AQS剩余的方法就是围绕队列,与线程阻塞唤醒等功能。

2.重要成员变量

AQS中有两个重要的成员变量:Node和ConditionObject。

AQS框架源码分析-AbstractQueuedSynchronizer

①Node的作用是存储获取锁失败的线程,并且维护一个CLH FIFO队列,该队列是会被多线程操作的,所以Node中大部分变量都是被volatile修饰,并且通过自旋和CAS进行原子性的操作。CLH的数据结构如下:

AQS框架源码分析-AbstractQueuedSynchronizer

Node有一个模式的属性:独占模式共享模式,独占模式下资源是线程独占的,共享模式下,资源是可以被多个线程占用的。

Node源码如下:

  1 static final class Node {
  2         /** Marker to indicate a node is waiting in shared mode */
  3         static final Node SHARED = new Node();  // 共享模式
  4         /** Marker to indicate a node is waiting in exclusive mode */
  5         static final Node EXCLUSIVE = null;  // 独占模式
  6 
  7         /** waitStatus value to indicate thread has cancelled */
  8         static final int CANCELLED =  1;  // 表明线程已处于结束状态(被取消)
  9         /** waitStatus value to indicate successor's thread needs unparking */
 10         static final int SIGNAL    = -1; // 表明线程需要被唤醒
 11         /** waitStatus value to indicate thread is waiting on condition */
 12         static final int CONDITION = -2; // 表明线程正处于条件队列上,等待某一条件
 13         /**
 14          * waitStatus value to indicate the next acquireShared should
 15          * unconditionally propagate
 16          */
 17         static final int PROPAGATE = -3; // 共享模式下同步状态会被传播
 18 
 19         /**
 20          * Status field, taking on only the values:
 21          *   SIGNAL:     The successor of this node is (or will soon be)
 22          *               blocked (via park), so the current node must
 23          *               unpark its successor when it releases or
 24          *               cancels. To avoid races, acquire methods must
 25          *               first indicate they need a signal,
 26          *               then retry the atomic acquire, and then,
 27          *               on failure, block.
 28          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
 29          *               Nodes never leave this state. In particular,
 30          *               a thread with cancelled node never again blocks.
 31          *   CONDITION:  This node is currently on a condition queue.
 32          *               It will not be used as a sync queue node
 33          *               until transferred, at which time the status
 34          *               will be set to 0. (Use of this value here has
 35          *               nothing to do with the other uses of the
 36          *               field, but simplifies mechanics.)
 37          *   PROPAGATE:  A releaseShared should be propagated to other
 38          *               nodes. This is set (for head node only) in
 39          *               doReleaseShared to ensure propagation
 40          *               continues, even if other operations have
 41          *               since intervened.
 42          *   0:          None of the above
 43          *
 44          * The values are arranged numerically to simplify use.
 45          * Non-negative values mean that a node doesn't need to
 46          * signal. So, most code doesn't need to check for particular
 47          * values, just for sign.
 48          *
 49          * The field is initialized to 0 for normal sync nodes, and
 50          * CONDITION for condition nodes.  It is modified using CAS
 51          * (or when possible, unconditional volatile writes).
 52          */
 53         volatile int waitStatus;
 54 
 55         /**
 56          * Link to predecessor node that current node/thread relies on
 57          * for checking waitStatus. Assigned during enqueuing, and nulled
 58          * out (for sake of GC) only upon dequeuing.  Also, upon
 59          * cancellation of a predecessor, we short-circuit while
 60          * finding a non-cancelled one, which will always exist
 61          * because the head node is never cancelled: A node becomes
 62          * head only as a result of successful acquire. A
 63          * cancelled thread never succeeds in acquiring, and a thread only
 64          * cancels itself, not any other node.
 65          */
 66         volatile Node prev;
 67 
 68         /**
 69          * Link to the successor node that the current node/thread
 70          * unparks upon release. Assigned during enqueuing, adjusted
 71          * when bypassing cancelled predecessors, and nulled out (for
 72          * sake of GC) when dequeued.  The enq operation does not
 73          * assign next field of a predecessor until after attachment,
 74          * so seeing a null next field does not necessarily mean that
 75          * node is at end of queue. However, if a next field appears
 76          * to be null, we can scan prev's from the tail to
 77          * double-check.  The next field of cancelled nodes is set to
 78          * point to the node itself instead of null, to make life
 79          * easier for isOnSyncQueue.
 80          */
 81         volatile Node next;
 82 
 83         /**
 84          * The thread that enqueued this node.  Initialized on
 85          * construction and nulled out after use.
 86          */
 87         volatile Thread thread;
 88 
 89         /**
 90          * Link to next node waiting on condition, or the special
 91          * value SHARED.  Because condition queues are accessed only
 92          * when holding in exclusive mode, we just need a simple
 93          * linked queue to hold nodes while they are waiting on
 94          * conditions. They are then transferred to the queue to
 95          * re-acquire. And because conditions can only be exclusive,
 96          * we save a field by using special value to indicate shared
 97          * mode.
 98          */
 99         Node nextWaiter;
100 
101         /**
102          * Returns true if node is waiting in shared mode.
103          */
104         final boolean isShared() {
105             return nextWaiter == SHARED;
106         }
107 
108         /**
109          * Returns previous node, or throws NullPointerException if null.
110          * Use when predecessor cannot be null.  The null check could
111          * be elided, but is present to help the VM.
112          *
113          * @return the predecessor of this node
114          */
115         final Node predecessor() throws NullPointerException {
116             Node p = prev;
117             if (p == null)
118                 throw new NullPointerException();
119             else
120                 return p;
121         }
122 
123         Node() {    // Used to establish initial head or SHARED marker
124         }
125         // 线程加入等待结点
126         Node(Thread thread, Node mode) {     // Used by addWaiter
127             this.nextWaiter = mode;
128             this.thread = thread;
129         }
130         // 线程加入条件对列,会带上线程的状态值waitStatus
131         Node(Thread thread, int waitStatus) { // Used by Condition
132             this.waitStatus = waitStatus;
133             this.thread = thread;
134         }
135     }

②ConditionObject:条件队列,这个类的作用从AQS的注释上可知。

AQS框架源码分析-AbstractQueuedSynchronizer

该类主要是为了让子类实现独占模式。AQS框架下独占模式的获取资源、释放等操作到最后都是基于这个类实现的。只有在独占模式下才会去使用该类。

ConditionObject源码如下(对主要代码进行了注释):

  1 public class ConditionObject implements Condition, java.io.Serializable {
  2         private static final long serialVersionUID = 1173984872572414699L;
  3         /** First node of condition queue. */
  4         private transient Node firstWaiter;  // 存储条件对列中第一个节点
  5         /** Last node of condition queue. */
  6         private transient Node lastWaiter; // 存储条件对列中最后一个节点
  7 
  8         /**
  9          * Creates a new {@code ConditionObject} instance.
 10          */
 11         public ConditionObject() { }
 12 
 13         // Internal methods
 14 
 15         /**
 16          * Adds a new waiter to wait queue.  // 增加一个新的节点到等待队列中
 17          * @return its new wait node
 18          */
 19         private Node addConditionWaiter() {
 20             Node t = lastWaiter;
 21             // 如果最后一个节点的状态已经结束,则直接清理掉
 22             // If lastWaiter is cancelled, clean out.
 23             if (t != null && t.waitStatus != Node.CONDITION) {
 24                // 拆分已经处于结束状态的节点 也就是清除掉这类节点
 25                unlinkCancelledWaiters();
 26                t = lastWaiter;
 27             }
 28             // 创建一个新的节点,带上结点状态,表明结点处于条件对列上
 29             Node node = new Node(Thread.currentThread(), Node.CONDITION);
 30             /**
 31              条件队列中加入节点都是从队尾加入,并且从下面代码可知,每次都会存储最后一个节点的值。
 32              当最后一个节点为空时,说明队列中不存在节点,所以将node赋值给第一个节点,否则将节点加入对列尾
 33              */
 34             if (t == null)
 35                 firstWaiter = node;
 36             else
 37                 t.nextWaiter = node;
 38             lastWaiter = node;  // 存储最后一个节点的值
 39             return node;
 40         }
 41 
 42         /**
 43          * 唤醒节点
 44          * 移除和转换节点直到节点状态处于未结束或者为空 (节点移除相当于唤醒)
 45          * Removes and transfers nodes until hit non-cancelled one or
 46          * null. Split out from signal in part to encourage compilers
 47          * to inline the case of no waiters.
 48          * @param first (non-null) the first node on condition queue
 49          */
 50         private void doSignal(Node first) {
 51             do {
 52                 // 当next节点为null时,则将lastWaiter赋值为null
 53                 if ( (firstWaiter = first.nextWaiter) == null)
 54                     lastWaiter = null;
 55                 first.nextWaiter = null; // 切断当前节点
 56             } while (!transferForSignal(first) &&
 57                      (first = firstWaiter) != null);
 58         }
 59 
 60         /**
 61          * 唤醒所有节点
 62          * Removes and transfers all nodes.
 63          * @param first (non-null) the first node on condition queue
 64          */
 65         private void doSignalAll(Node first) {
 66             lastWaiter = firstWaiter = null;
 67             do {
 68                 // 循环唤醒所有节点,代码还是比较容易理解
 69                 // 将每个节点直接截断即可
 70                 Node next = first.nextWaiter;
 71                 first.nextWaiter = null;
 72                 transferForSignal(first);
 73                 first = next;
 74             } while (first != null);
 75         }
 76 
 77         /**
 78          * Unlinks cancelled waiter nodes from condition queue.
 79          * Called only while holding lock. This is called when
 80          * cancellation occurred during condition wait, and upon
 81          * insertion of a new waiter when lastWaiter is seen to have
 82          * been cancelled. This method is needed to avoid garbage
 83          * retention in the absence of signals. So even though it may
 84          * require a full traversal, it comes into play only when
 85          * timeouts or cancellations occur in the absence of
 86          * signals. It traverses all nodes rather than stopping at a
 87          * particular target to unlink all pointers to garbage nodes
 88          * without requiring many re-traversals during cancellation
 89          * storms.
 90          */
 91         private void unlinkCancelledWaiters() { // 删除处于结束状态的节点
 92             Node t = firstWaiter;
 93             Node trail = null;
 94             // 第一个节点为空,直接返回
 95             // 这里会遍历所有节点
 96             while (t != null) {
 97                 Node next = t.nextWaiter; // 记录下一个节点的值
 98                 // 当节点状态不为CONDITION
 99                 if (t.waitStatus != Node.CONDITION) {
100                     // 首先将当前节点的下一个节点赋值为空,切断当前节点链路
101                     t.nextWaiter = null;
102                     // 如果追踪节点为空的时候,则存储第一个节点的值为next,因为当前节点状态不为CONDITION需要清理
103                     if (trail == null)
104                         firstWaiter = next;
105                     else  // 在追踪节点串联下一个节点,主要是为了存储最后一个节点的值
106                         trail.nextWaiter = next;
107                     if (next == null)  // 当next为空时,则存储trail为最后一个节点,将最后一个节点值存储下来
108                         lastWaiter = trail;
109                 }
110                 else  // 当节点状态为CONDITION时,将该节点赋值给trail
111                     trail = t;
112                 t = next; // 将next赋值给t,继续遍历
113             }
114         }
115 
116         // public methods
117 
118         /**
119          * 唤醒等待时间最长的节点,使其拥有锁
120          * Moves the longest-waiting thread, if one exists, from the
121          * wait queue for this condition to the wait queue for the
122          * owning lock.
123          *
124          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
125          *         returns {@code false}
126          */
127         public final void signal() {
128             // 如果线程不是独占资源,则抛出异常,从这里也说明ConditionObject只能用在独占模式中
129             if (!isHeldExclusively())
130                 throw new IllegalMonitorStateException();
131             Node first = firstWaiter;
132             if (first != null)
133                 doSignal(first);
134         }
135 
136         /**
137          * 唤醒所有等待节点
138          * Moves all threads from the wait queue for this condition to
139          * the wait queue for the owning lock.
140          *
141          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
142          *         returns {@code false}
143          */
144         public final void signalAll() {
145             if (!isHeldExclusively())
146                 throw new IllegalMonitorStateException();
147             Node first = firstWaiter;
148             if (first != null)
149                 doSignalAll(first);
150         }
151 
152         /**
153          * 节点不间断等待
154          * Implements uninterruptible condition wait.
155          * <ol>
156          * <li> Save lock state returned by {@link #getState}.
157          * <li> Invoke {@link #release} with saved state as argument,
158          *      throwing IllegalMonitorStateException if it fails.
159          * <li> Block until signalled.
160          * <li> Reacquire by invoking specialized version of
161          *      {@link #acquire} with saved state as argument.
162          * </ol>
163          */
164         public final void awaitUninterruptibly() {
165             Node node = addConditionWaiter();
166             int savedState = fullyRelease(node);
167             boolean interrupted = false;
168             while (!isOnSyncQueue(node)) {
169                 LockSupport.park(this);
170                 if (Thread.interrupted())
171                     interrupted = true;
172             }
173             if (acquireQueued(node, savedState) || interrupted)
174                 selfInterrupt();
175         }
176         
177         /*
178          * For interruptible waits, we need to track whether to throw
179          * InterruptedException, if interrupted while blocked on
180          * condition, versus reinterrupt current thread, if
181          * interrupted while blocked waiting to re-acquire.
182          */
183 
184         /** Mode meaning to reinterrupt on exit from wait */
185         private static final int REINTERRUPT =  1;
186         /** Mode meaning to throw InterruptedException on exit from wait */
187         private static final int THROW_IE    = -1;
188 
189         /**
190          * Checks for interrupt, returning THROW_IE if interrupted
191          * before signalled, REINTERRUPT if after signalled, or
192          * 0 if not interrupted.
193          */
194         private int checkInterruptWhileWaiting(Node node) {
195             return Thread.interrupted() ?
196                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
197                 0;
198         }
199 
200         /**
201          * Throws InterruptedException, reinterrupts current thread, or
202          * does nothing, depending on mode.
203          */
204         private void reportInterruptAfterWait(int interruptMode)
205             throws InterruptedException {
206             if (interruptMode == THROW_IE)
207                 throw new InterruptedException();
208             else if (interruptMode == REINTERRUPT)
209                 selfInterrupt();
210         }
211 
212         /**
213          * Implements interruptible condition wait.
214          * <ol>
215          * <li> If current thread is interrupted, throw InterruptedException.
216          * <li> Save lock state returned by {@link #getState}.
217          * <li> Invoke {@link #release} with saved state as argument,
218          *      throwing IllegalMonitorStateException if it fails.
219          * <li> Block until signalled or interrupted.
220          * <li> Reacquire by invoking specialized version of
221          *      {@link #acquire} with saved state as argument.
222          * <li> If interrupted while blocked in step 4, throw InterruptedException.
223          * </ol>
224          */
225         public final void await() throws InterruptedException {
226             if (Thread.interrupted())
227                 throw new InterruptedException();
228             Node node = addConditionWaiter();
229             int savedState = fullyRelease(node);
230             int interruptMode = 0;
231             while (!isOnSyncQueue(node)) {
232                 LockSupport.park(this);
233                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
234                     break;
235             }
236             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
237                 interruptMode = REINTERRUPT;
238             if (node.nextWaiter != null) // clean up if cancelled
239                 unlinkCancelledWaiters();
240             if (interruptMode != 0)
241                 reportInterruptAfterWait(interruptMode);
242         }
243 
244         /**
245          * Implements timed condition wait.
246          * <ol>
247          * <li> If current thread is interrupted, throw InterruptedException.
248          * <li> Save lock state returned by {@link #getState}.
249          * <li> Invoke {@link #release} with saved state as argument,
250          *      throwing IllegalMonitorStateException if it fails.
251          * <li> Block until signalled, interrupted, or timed out.
252          * <li> Reacquire by invoking specialized version of
253          *      {@link #acquire} with saved state as argument.
254          * <li> If interrupted while blocked in step 4, throw InterruptedException.
255          * </ol>
256          */
257         public final long awaitNanos(long nanosTimeout)
258                 throws InterruptedException {
259             if (Thread.interrupted())
260                 throw new InterruptedException();
261             Node node = addConditionWaiter();
262             int savedState = fullyRelease(node);
263             final long deadline = System.nanoTime() + nanosTimeout;
264             int interruptMode = 0;
265             while (!isOnSyncQueue(node)) {
266                 if (nanosTimeout <= 0L) {
267                     transferAfterCancelledWait(node);
268                     break;
269                 }
270                 if (nanosTimeout >= spinForTimeoutThreshold)
271                     LockSupport.parkNanos(this, nanosTimeout);
272                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
273                     break;
274                 nanosTimeout = deadline - System.nanoTime();
275             }
276             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
277                 interruptMode = REINTERRUPT;
278             if (node.nextWaiter != null)
279                 unlinkCancelledWaiters();
280             if (interruptMode != 0)
281                 reportInterruptAfterWait(interruptMode);
282             return deadline - System.nanoTime();
283         }
284 
285         /**
286          * Implements absolute timed condition wait.
287          * <ol>
288          * <li> If current thread is interrupted, throw InterruptedException.
289          * <li> Save lock state returned by {@link #getState}.
290          * <li> Invoke {@link #release} with saved state as argument,
291          *      throwing IllegalMonitorStateException if it fails.
292          * <li> Block until signalled, interrupted, or timed out.
293          * <li> Reacquire by invoking specialized version of
294          *      {@link #acquire} with saved state as argument.
295          * <li> If interrupted while blocked in step 4, throw InterruptedException.
296          * <li> If timed out while blocked in step 4, return false, else true.
297          * </ol>
298          */
299         public final boolean awaitUntil(Date deadline)
300                 throws InterruptedException {
301             long abstime = deadline.getTime();
302             if (Thread.interrupted())
303                 throw new InterruptedException();
304             Node node = addConditionWaiter();
305             int savedState = fullyRelease(node);
306             boolean timedout = false;
307             int interruptMode = 0;
308             while (!isOnSyncQueue(node)) {
309                 if (System.currentTimeMillis() > abstime) {
310                     timedout = transferAfterCancelledWait(node);
311                     break;
312                 }
313                 LockSupport.parkUntil(this, abstime);
314                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
315                     break;
316             }
317             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
318                 interruptMode = REINTERRUPT;
319             if (node.nextWaiter != null)
320                 unlinkCancelledWaiters();
321             if (interruptMode != 0)
322                 reportInterruptAfterWait(interruptMode);
323             return !timedout;
324         }
325 
326         /**
327          * Implements timed condition wait.
328          * <ol>
329          * <li> If current thread is interrupted, throw InterruptedException.
330          * <li> Save lock state returned by {@link #getState}.
331          * <li> Invoke {@link #release} with saved state as argument,
332          *      throwing IllegalMonitorStateException if it fails.
333          * <li> Block until signalled, interrupted, or timed out.
334          * <li> Reacquire by invoking specialized version of
335          *      {@link #acquire} with saved state as argument.
336          * <li> If interrupted while blocked in step 4, throw InterruptedException.
337          * <li> If timed out while blocked in step 4, return false, else true.
338          * </ol>
339          */
340         public final boolean await(long time, TimeUnit unit)
341                 throws InterruptedException {
342             long nanosTimeout = unit.toNanos(time);
343             if (Thread.interrupted())
344                 throw new InterruptedException();
345             Node node = addConditionWaiter();
346             int savedState = fullyRelease(node);
347             final long deadline = System.nanoTime() + nanosTimeout;
348             boolean timedout = false;
349             int interruptMode = 0;
350             while (!isOnSyncQueue(node)) {
351                 if (nanosTimeout <= 0L) {
352                     timedout = transferAfterCancelledWait(node);
353                     break;
354                 }
355                 if (nanosTimeout >= spinForTimeoutThreshold)
356                     LockSupport.parkNanos(this, nanosTimeout);
357                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
358                     break;
359                 nanosTimeout = deadline - System.nanoTime();
360             }
361             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
362                 interruptMode = REINTERRUPT;
363             if (node.nextWaiter != null)
364                 unlinkCancelledWaiters();
365             if (interruptMode != 0)
366                 reportInterruptAfterWait(interruptMode);
367             return !timedout;
368         }
369 
370         //  support for instrumentation
371 
372         /**
373          * Returns true if this condition was created by the given
374          * synchronization object.
375          *
376          * @return {@code true} if owned
377          */
378         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
379             return sync == AbstractQueuedSynchronizer.this;
380         }
381 
382         /**
383          * Queries whether any threads are waiting on this condition.
384          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
385          *
386          * @return {@code true} if there are any waiting threads
387          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
388          *         returns {@code false}
389          */
390         protected final boolean hasWaiters() {
391             if (!isHeldExclusively())
392                 throw new IllegalMonitorStateException();
393             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
394                 if (w.waitStatus == Node.CONDITION)
395                     return true;
396             }
397             return false;
398         }
399 
400         /**
401          * Returns an estimate of the number of threads waiting on
402          * this condition.
403          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
404          *
405          * @return the estimated number of waiting threads
406          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
407          *         returns {@code false}
408          */
409         protected final int getWaitQueueLength() {
410             if (!isHeldExclusively())
411                 throw new IllegalMonitorStateException();
412             int n = 0;
413             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
414                 if (w.waitStatus == Node.CONDITION)
415                     ++n;
416             }
417             return n;
418         }
419 
420         /**
421          * Returns a collection containing those threads that may be
422          * waiting on this Condition.
423          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
424          *
425          * @return the collection of threads
426          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
427          *         returns {@code false}
428          */
429         protected final Collection<Thread> getWaitingThreads() {
430             if (!isHeldExclusively())
431                 throw new IllegalMonitorStateException();
432             ArrayList<Thread> list = new ArrayList<Thread>();
433             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
434                 if (w.waitStatus == Node.CONDITION) {
435                     Thread t = w.thread;
436                     if (t != null)
437                         list.add(t);
438                 }
439             }
440             return list;
441         }
442     }
View Code

相关文章:

  • 2023-02-09
  • 2022-01-19
  • 2021-10-02
  • 2021-09-29
  • 2021-11-09
  • 2021-08-28
  • 2021-07-25
猜你喜欢
  • 2021-10-11
  • 2020-11-17
  • 2019-07-03
  • 2021-05-22
  • 2021-12-22
  • 2021-05-06
  • 2021-08-04
相关资源
相似解决方案