前言:AQS框架在J.U.C中的地位不言而喻,可以说没有AQS就没有J.U.C包,可见其重要性,因此有必要对其原理进行详细深入的理解。
1.AQS是什么
在深入AQS之前,首先我们要搞清楚什么是AQS。AQS全称是AbstractQueuedSynchronizer,我们直接查看AQS源码的注释。
大致意思就是说:AQS提供了实现阻塞锁和相关同步器并依赖先进先出(FIFO)等待队列的框架。
AQS依赖一个原子数值作为锁的状态,子类可以有多个状态值,只能通过原子方法区操作该值,从而保证同步。
通过第一段的注释大致总结下AQS是什么:
①AQS是一个同步的基础框架,基于一个先进先出的队列。
②锁机制依赖一个原子值的状态。
③AQS的子类负责定义与操作这个状态值,但必须通过AQS提供的原子操作。
④AQS剩余的方法就是围绕队列,与线程阻塞唤醒等功能。
2.重要成员变量
AQS中有两个重要的成员变量:Node和ConditionObject。
①Node的作用是存储获取锁失败的线程,并且维护一个CLH FIFO队列,该队列是会被多线程操作的,所以Node中大部分变量都是被volatile修饰,并且通过自旋和CAS进行原子性的操作。CLH的数据结构如下:
Node有一个模式的属性:独占模式和共享模式,独占模式下资源是线程独占的,共享模式下,资源是可以被多个线程占用的。
Node源码如下:
1 static final class Node { 2 /** Marker to indicate a node is waiting in shared mode */ 3 static final Node SHARED = new Node(); // 共享模式 4 /** Marker to indicate a node is waiting in exclusive mode */ 5 static final Node EXCLUSIVE = null; // 独占模式 6 7 /** waitStatus value to indicate thread has cancelled */ 8 static final int CANCELLED = 1; // 表明线程已处于结束状态(被取消) 9 /** waitStatus value to indicate successor's thread needs unparking */ 10 static final int SIGNAL = -1; // 表明线程需要被唤醒 11 /** waitStatus value to indicate thread is waiting on condition */ 12 static final int CONDITION = -2; // 表明线程正处于条件队列上,等待某一条件 13 /** 14 * waitStatus value to indicate the next acquireShared should 15 * unconditionally propagate 16 */ 17 static final int PROPAGATE = -3; // 共享模式下同步状态会被传播 18 19 /** 20 * Status field, taking on only the values: 21 * SIGNAL: The successor of this node is (or will soon be) 22 * blocked (via park), so the current node must 23 * unpark its successor when it releases or 24 * cancels. To avoid races, acquire methods must 25 * first indicate they need a signal, 26 * then retry the atomic acquire, and then, 27 * on failure, block. 28 * CANCELLED: This node is cancelled due to timeout or interrupt. 29 * Nodes never leave this state. In particular, 30 * a thread with cancelled node never again blocks. 31 * CONDITION: This node is currently on a condition queue. 32 * It will not be used as a sync queue node 33 * until transferred, at which time the status 34 * will be set to 0. (Use of this value here has 35 * nothing to do with the other uses of the 36 * field, but simplifies mechanics.) 37 * PROPAGATE: A releaseShared should be propagated to other 38 * nodes. This is set (for head node only) in 39 * doReleaseShared to ensure propagation 40 * continues, even if other operations have 41 * since intervened. 42 * 0: None of the above 43 * 44 * The values are arranged numerically to simplify use. 45 * Non-negative values mean that a node doesn't need to 46 * signal. So, most code doesn't need to check for particular 47 * values, just for sign. 48 * 49 * The field is initialized to 0 for normal sync nodes, and 50 * CONDITION for condition nodes. It is modified using CAS 51 * (or when possible, unconditional volatile writes). 52 */ 53 volatile int waitStatus; 54 55 /** 56 * Link to predecessor node that current node/thread relies on 57 * for checking waitStatus. Assigned during enqueuing, and nulled 58 * out (for sake of GC) only upon dequeuing. Also, upon 59 * cancellation of a predecessor, we short-circuit while 60 * finding a non-cancelled one, which will always exist 61 * because the head node is never cancelled: A node becomes 62 * head only as a result of successful acquire. A 63 * cancelled thread never succeeds in acquiring, and a thread only 64 * cancels itself, not any other node. 65 */ 66 volatile Node prev; 67 68 /** 69 * Link to the successor node that the current node/thread 70 * unparks upon release. Assigned during enqueuing, adjusted 71 * when bypassing cancelled predecessors, and nulled out (for 72 * sake of GC) when dequeued. The enq operation does not 73 * assign next field of a predecessor until after attachment, 74 * so seeing a null next field does not necessarily mean that 75 * node is at end of queue. However, if a next field appears 76 * to be null, we can scan prev's from the tail to 77 * double-check. The next field of cancelled nodes is set to 78 * point to the node itself instead of null, to make life 79 * easier for isOnSyncQueue. 80 */ 81 volatile Node next; 82 83 /** 84 * The thread that enqueued this node. Initialized on 85 * construction and nulled out after use. 86 */ 87 volatile Thread thread; 88 89 /** 90 * Link to next node waiting on condition, or the special 91 * value SHARED. Because condition queues are accessed only 92 * when holding in exclusive mode, we just need a simple 93 * linked queue to hold nodes while they are waiting on 94 * conditions. They are then transferred to the queue to 95 * re-acquire. And because conditions can only be exclusive, 96 * we save a field by using special value to indicate shared 97 * mode. 98 */ 99 Node nextWaiter; 100 101 /** 102 * Returns true if node is waiting in shared mode. 103 */ 104 final boolean isShared() { 105 return nextWaiter == SHARED; 106 } 107 108 /** 109 * Returns previous node, or throws NullPointerException if null. 110 * Use when predecessor cannot be null. The null check could 111 * be elided, but is present to help the VM. 112 * 113 * @return the predecessor of this node 114 */ 115 final Node predecessor() throws NullPointerException { 116 Node p = prev; 117 if (p == null) 118 throw new NullPointerException(); 119 else 120 return p; 121 } 122 123 Node() { // Used to establish initial head or SHARED marker 124 } 125 // 线程加入等待结点 126 Node(Thread thread, Node mode) { // Used by addWaiter 127 this.nextWaiter = mode; 128 this.thread = thread; 129 } 130 // 线程加入条件对列,会带上线程的状态值waitStatus 131 Node(Thread thread, int waitStatus) { // Used by Condition 132 this.waitStatus = waitStatus; 133 this.thread = thread; 134 } 135 }
②ConditionObject:条件队列,这个类的作用从AQS的注释上可知。
该类主要是为了让子类实现独占模式。AQS框架下独占模式的获取资源、释放等操作到最后都是基于这个类实现的。只有在独占模式下才会去使用该类。
ConditionObject源码如下(对主要代码进行了注释):
1 public class ConditionObject implements Condition, java.io.Serializable { 2 private static final long serialVersionUID = 1173984872572414699L; 3 /** First node of condition queue. */ 4 private transient Node firstWaiter; // 存储条件对列中第一个节点 5 /** Last node of condition queue. */ 6 private transient Node lastWaiter; // 存储条件对列中最后一个节点 7 8 /** 9 * Creates a new {@code ConditionObject} instance. 10 */ 11 public ConditionObject() { } 12 13 // Internal methods 14 15 /** 16 * Adds a new waiter to wait queue. // 增加一个新的节点到等待队列中 17 * @return its new wait node 18 */ 19 private Node addConditionWaiter() { 20 Node t = lastWaiter; 21 // 如果最后一个节点的状态已经结束,则直接清理掉 22 // If lastWaiter is cancelled, clean out. 23 if (t != null && t.waitStatus != Node.CONDITION) { 24 // 拆分已经处于结束状态的节点 也就是清除掉这类节点 25 unlinkCancelledWaiters(); 26 t = lastWaiter; 27 } 28 // 创建一个新的节点,带上结点状态,表明结点处于条件对列上 29 Node node = new Node(Thread.currentThread(), Node.CONDITION); 30 /** 31 条件队列中加入节点都是从队尾加入,并且从下面代码可知,每次都会存储最后一个节点的值。 32 当最后一个节点为空时,说明队列中不存在节点,所以将node赋值给第一个节点,否则将节点加入对列尾 33 */ 34 if (t == null) 35 firstWaiter = node; 36 else 37 t.nextWaiter = node; 38 lastWaiter = node; // 存储最后一个节点的值 39 return node; 40 } 41 42 /** 43 * 唤醒节点 44 * 移除和转换节点直到节点状态处于未结束或者为空 (节点移除相当于唤醒) 45 * Removes and transfers nodes until hit non-cancelled one or 46 * null. Split out from signal in part to encourage compilers 47 * to inline the case of no waiters. 48 * @param first (non-null) the first node on condition queue 49 */ 50 private void doSignal(Node first) { 51 do { 52 // 当next节点为null时,则将lastWaiter赋值为null 53 if ( (firstWaiter = first.nextWaiter) == null) 54 lastWaiter = null; 55 first.nextWaiter = null; // 切断当前节点 56 } while (!transferForSignal(first) && 57 (first = firstWaiter) != null); 58 } 59 60 /** 61 * 唤醒所有节点 62 * Removes and transfers all nodes. 63 * @param first (non-null) the first node on condition queue 64 */ 65 private void doSignalAll(Node first) { 66 lastWaiter = firstWaiter = null; 67 do { 68 // 循环唤醒所有节点,代码还是比较容易理解 69 // 将每个节点直接截断即可 70 Node next = first.nextWaiter; 71 first.nextWaiter = null; 72 transferForSignal(first); 73 first = next; 74 } while (first != null); 75 } 76 77 /** 78 * Unlinks cancelled waiter nodes from condition queue. 79 * Called only while holding lock. This is called when 80 * cancellation occurred during condition wait, and upon 81 * insertion of a new waiter when lastWaiter is seen to have 82 * been cancelled. This method is needed to avoid garbage 83 * retention in the absence of signals. So even though it may 84 * require a full traversal, it comes into play only when 85 * timeouts or cancellations occur in the absence of 86 * signals. It traverses all nodes rather than stopping at a 87 * particular target to unlink all pointers to garbage nodes 88 * without requiring many re-traversals during cancellation 89 * storms. 90 */ 91 private void unlinkCancelledWaiters() { // 删除处于结束状态的节点 92 Node t = firstWaiter; 93 Node trail = null; 94 // 第一个节点为空,直接返回 95 // 这里会遍历所有节点 96 while (t != null) { 97 Node next = t.nextWaiter; // 记录下一个节点的值 98 // 当节点状态不为CONDITION 99 if (t.waitStatus != Node.CONDITION) { 100 // 首先将当前节点的下一个节点赋值为空,切断当前节点链路 101 t.nextWaiter = null; 102 // 如果追踪节点为空的时候,则存储第一个节点的值为next,因为当前节点状态不为CONDITION需要清理 103 if (trail == null) 104 firstWaiter = next; 105 else // 在追踪节点串联下一个节点,主要是为了存储最后一个节点的值 106 trail.nextWaiter = next; 107 if (next == null) // 当next为空时,则存储trail为最后一个节点,将最后一个节点值存储下来 108 lastWaiter = trail; 109 } 110 else // 当节点状态为CONDITION时,将该节点赋值给trail 111 trail = t; 112 t = next; // 将next赋值给t,继续遍历 113 } 114 } 115 116 // public methods 117 118 /** 119 * 唤醒等待时间最长的节点,使其拥有锁 120 * Moves the longest-waiting thread, if one exists, from the 121 * wait queue for this condition to the wait queue for the 122 * owning lock. 123 * 124 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 125 * returns {@code false} 126 */ 127 public final void signal() { 128 // 如果线程不是独占资源,则抛出异常,从这里也说明ConditionObject只能用在独占模式中 129 if (!isHeldExclusively()) 130 throw new IllegalMonitorStateException(); 131 Node first = firstWaiter; 132 if (first != null) 133 doSignal(first); 134 } 135 136 /** 137 * 唤醒所有等待节点 138 * Moves all threads from the wait queue for this condition to 139 * the wait queue for the owning lock. 140 * 141 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 142 * returns {@code false} 143 */ 144 public final void signalAll() { 145 if (!isHeldExclusively()) 146 throw new IllegalMonitorStateException(); 147 Node first = firstWaiter; 148 if (first != null) 149 doSignalAll(first); 150 } 151 152 /** 153 * 节点不间断等待 154 * Implements uninterruptible condition wait. 155 * <ol> 156 * <li> Save lock state returned by {@link #getState}. 157 * <li> Invoke {@link #release} with saved state as argument, 158 * throwing IllegalMonitorStateException if it fails. 159 * <li> Block until signalled. 160 * <li> Reacquire by invoking specialized version of 161 * {@link #acquire} with saved state as argument. 162 * </ol> 163 */ 164 public final void awaitUninterruptibly() { 165 Node node = addConditionWaiter(); 166 int savedState = fullyRelease(node); 167 boolean interrupted = false; 168 while (!isOnSyncQueue(node)) { 169 LockSupport.park(this); 170 if (Thread.interrupted()) 171 interrupted = true; 172 } 173 if (acquireQueued(node, savedState) || interrupted) 174 selfInterrupt(); 175 } 176 177 /* 178 * For interruptible waits, we need to track whether to throw 179 * InterruptedException, if interrupted while blocked on 180 * condition, versus reinterrupt current thread, if 181 * interrupted while blocked waiting to re-acquire. 182 */ 183 184 /** Mode meaning to reinterrupt on exit from wait */ 185 private static final int REINTERRUPT = 1; 186 /** Mode meaning to throw InterruptedException on exit from wait */ 187 private static final int THROW_IE = -1; 188 189 /** 190 * Checks for interrupt, returning THROW_IE if interrupted 191 * before signalled, REINTERRUPT if after signalled, or 192 * 0 if not interrupted. 193 */ 194 private int checkInterruptWhileWaiting(Node node) { 195 return Thread.interrupted() ? 196 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 197 0; 198 } 199 200 /** 201 * Throws InterruptedException, reinterrupts current thread, or 202 * does nothing, depending on mode. 203 */ 204 private void reportInterruptAfterWait(int interruptMode) 205 throws InterruptedException { 206 if (interruptMode == THROW_IE) 207 throw new InterruptedException(); 208 else if (interruptMode == REINTERRUPT) 209 selfInterrupt(); 210 } 211 212 /** 213 * Implements interruptible condition wait. 214 * <ol> 215 * <li> If current thread is interrupted, throw InterruptedException. 216 * <li> Save lock state returned by {@link #getState}. 217 * <li> Invoke {@link #release} with saved state as argument, 218 * throwing IllegalMonitorStateException if it fails. 219 * <li> Block until signalled or interrupted. 220 * <li> Reacquire by invoking specialized version of 221 * {@link #acquire} with saved state as argument. 222 * <li> If interrupted while blocked in step 4, throw InterruptedException. 223 * </ol> 224 */ 225 public final void await() throws InterruptedException { 226 if (Thread.interrupted()) 227 throw new InterruptedException(); 228 Node node = addConditionWaiter(); 229 int savedState = fullyRelease(node); 230 int interruptMode = 0; 231 while (!isOnSyncQueue(node)) { 232 LockSupport.park(this); 233 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 234 break; 235 } 236 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 237 interruptMode = REINTERRUPT; 238 if (node.nextWaiter != null) // clean up if cancelled 239 unlinkCancelledWaiters(); 240 if (interruptMode != 0) 241 reportInterruptAfterWait(interruptMode); 242 } 243 244 /** 245 * Implements timed condition wait. 246 * <ol> 247 * <li> If current thread is interrupted, throw InterruptedException. 248 * <li> Save lock state returned by {@link #getState}. 249 * <li> Invoke {@link #release} with saved state as argument, 250 * throwing IllegalMonitorStateException if it fails. 251 * <li> Block until signalled, interrupted, or timed out. 252 * <li> Reacquire by invoking specialized version of 253 * {@link #acquire} with saved state as argument. 254 * <li> If interrupted while blocked in step 4, throw InterruptedException. 255 * </ol> 256 */ 257 public final long awaitNanos(long nanosTimeout) 258 throws InterruptedException { 259 if (Thread.interrupted()) 260 throw new InterruptedException(); 261 Node node = addConditionWaiter(); 262 int savedState = fullyRelease(node); 263 final long deadline = System.nanoTime() + nanosTimeout; 264 int interruptMode = 0; 265 while (!isOnSyncQueue(node)) { 266 if (nanosTimeout <= 0L) { 267 transferAfterCancelledWait(node); 268 break; 269 } 270 if (nanosTimeout >= spinForTimeoutThreshold) 271 LockSupport.parkNanos(this, nanosTimeout); 272 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 273 break; 274 nanosTimeout = deadline - System.nanoTime(); 275 } 276 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 277 interruptMode = REINTERRUPT; 278 if (node.nextWaiter != null) 279 unlinkCancelledWaiters(); 280 if (interruptMode != 0) 281 reportInterruptAfterWait(interruptMode); 282 return deadline - System.nanoTime(); 283 } 284 285 /** 286 * Implements absolute timed condition wait. 287 * <ol> 288 * <li> If current thread is interrupted, throw InterruptedException. 289 * <li> Save lock state returned by {@link #getState}. 290 * <li> Invoke {@link #release} with saved state as argument, 291 * throwing IllegalMonitorStateException if it fails. 292 * <li> Block until signalled, interrupted, or timed out. 293 * <li> Reacquire by invoking specialized version of 294 * {@link #acquire} with saved state as argument. 295 * <li> If interrupted while blocked in step 4, throw InterruptedException. 296 * <li> If timed out while blocked in step 4, return false, else true. 297 * </ol> 298 */ 299 public final boolean awaitUntil(Date deadline) 300 throws InterruptedException { 301 long abstime = deadline.getTime(); 302 if (Thread.interrupted()) 303 throw new InterruptedException(); 304 Node node = addConditionWaiter(); 305 int savedState = fullyRelease(node); 306 boolean timedout = false; 307 int interruptMode = 0; 308 while (!isOnSyncQueue(node)) { 309 if (System.currentTimeMillis() > abstime) { 310 timedout = transferAfterCancelledWait(node); 311 break; 312 } 313 LockSupport.parkUntil(this, abstime); 314 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 315 break; 316 } 317 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 318 interruptMode = REINTERRUPT; 319 if (node.nextWaiter != null) 320 unlinkCancelledWaiters(); 321 if (interruptMode != 0) 322 reportInterruptAfterWait(interruptMode); 323 return !timedout; 324 } 325 326 /** 327 * Implements timed condition wait. 328 * <ol> 329 * <li> If current thread is interrupted, throw InterruptedException. 330 * <li> Save lock state returned by {@link #getState}. 331 * <li> Invoke {@link #release} with saved state as argument, 332 * throwing IllegalMonitorStateException if it fails. 333 * <li> Block until signalled, interrupted, or timed out. 334 * <li> Reacquire by invoking specialized version of 335 * {@link #acquire} with saved state as argument. 336 * <li> If interrupted while blocked in step 4, throw InterruptedException. 337 * <li> If timed out while blocked in step 4, return false, else true. 338 * </ol> 339 */ 340 public final boolean await(long time, TimeUnit unit) 341 throws InterruptedException { 342 long nanosTimeout = unit.toNanos(time); 343 if (Thread.interrupted()) 344 throw new InterruptedException(); 345 Node node = addConditionWaiter(); 346 int savedState = fullyRelease(node); 347 final long deadline = System.nanoTime() + nanosTimeout; 348 boolean timedout = false; 349 int interruptMode = 0; 350 while (!isOnSyncQueue(node)) { 351 if (nanosTimeout <= 0L) { 352 timedout = transferAfterCancelledWait(node); 353 break; 354 } 355 if (nanosTimeout >= spinForTimeoutThreshold) 356 LockSupport.parkNanos(this, nanosTimeout); 357 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 358 break; 359 nanosTimeout = deadline - System.nanoTime(); 360 } 361 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 362 interruptMode = REINTERRUPT; 363 if (node.nextWaiter != null) 364 unlinkCancelledWaiters(); 365 if (interruptMode != 0) 366 reportInterruptAfterWait(interruptMode); 367 return !timedout; 368 } 369 370 // support for instrumentation 371 372 /** 373 * Returns true if this condition was created by the given 374 * synchronization object. 375 * 376 * @return {@code true} if owned 377 */ 378 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 379 return sync == AbstractQueuedSynchronizer.this; 380 } 381 382 /** 383 * Queries whether any threads are waiting on this condition. 384 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 385 * 386 * @return {@code true} if there are any waiting threads 387 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 388 * returns {@code false} 389 */ 390 protected final boolean hasWaiters() { 391 if (!isHeldExclusively()) 392 throw new IllegalMonitorStateException(); 393 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 394 if (w.waitStatus == Node.CONDITION) 395 return true; 396 } 397 return false; 398 } 399 400 /** 401 * Returns an estimate of the number of threads waiting on 402 * this condition. 403 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 404 * 405 * @return the estimated number of waiting threads 406 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 407 * returns {@code false} 408 */ 409 protected final int getWaitQueueLength() { 410 if (!isHeldExclusively()) 411 throw new IllegalMonitorStateException(); 412 int n = 0; 413 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 414 if (w.waitStatus == Node.CONDITION) 415 ++n; 416 } 417 return n; 418 } 419 420 /** 421 * Returns a collection containing those threads that may be 422 * waiting on this Condition. 423 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 424 * 425 * @return the collection of threads 426 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 427 * returns {@code false} 428 */ 429 protected final Collection<Thread> getWaitingThreads() { 430 if (!isHeldExclusively()) 431 throw new IllegalMonitorStateException(); 432 ArrayList<Thread> list = new ArrayList<Thread>(); 433 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 434 if (w.waitStatus == Node.CONDITION) { 435 Thread t = w.thread; 436 if (t != null) 437 list.add(t); 438 } 439 } 440 return list; 441 } 442 }