AQS:抽象的队列同步器,是JUC包中构建锁或者其他同步组件的基础框架。

  Java中的大部分同步类(ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore)都是基于AbstractQueuedSynchronizer(AQS) 实现的。

  AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

 

原理: 

  AQS使用一个volatile的int类型的成员变量state来表示同步状态通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对state值的修改。

  volatile能够保证多线程下的可见性,当state=1则代表当前对象锁已经被占有

  核心思想是:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;

      如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH变体的虚拟双向队列(双向链表)实现的,将暂时获取不到锁的线程加入到队列中(AQS通过将每条请求共享资源的线程封装成队列中的一个节点来实现锁的分配)。

 

 

 

AQS数据结构

队列的节点Node

   /**
     * 等待队列节点类
     *
     * 等待队列是“CLH”的变体(Craig、Landin和Hagersten)锁队列。CLH锁通常用于自旋锁
     * 
     * 每个节点中的“status”字段跟踪线程是否应该阻塞。节点在其前置任务释放时被通知(signalled)。
     * 队列的每个节点都充当一个特定的通知样式监视器,其中包含一个等待线程。
     * 但是,status字段不控制线程是否被授予锁等。
     * 如果线程是队列中的第一个线程,它可能会尝试获取。但是,第一并不能保证成功,只给了了竞争锁的权利。
     * 所以当前发布的竞争者释放锁的线程可能需要重新等待。
     * <p>To enqueue into a CLH lock, you atomically splice it in as new
     * tail. To dequeue, you just set the head field.
   * * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+
*/ static final class Node { /** * Marker to indicate a node is waiting in shared mode * 标识表示一个节点以共享的模式等待锁 */ static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node(); /** * Marker to indicate a node is waiting in exclusive mode * 标识表示一个节点以独占的模式等待锁 */ static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null; /** * waitStatus value to indicate thread has cancelled * 表示线程获取锁的请求已经取消了 */ static final int CANCELLED = 1; /** * waitStatus value to indicate successor's thread needs unparking * 表示线程已经准备好了,就等资源释放了 */ static final int SIGNAL = -1; /** * waitStatus value to indicate thread is waiting on condition * 表示节点在等待队列中,节点线程等待唤醒 */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate * 表示线程处在SHARED情况下,该字段才会使用 */ static final int PROPAGATE = -3; /** * 当前节点在队列中的状态 * * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * 前驱指针 * * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile AbstractQueuedSynchronizer.Node prev; /** * 后继指针 * * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile AbstractQueuedSynchronizer.Node next; /** * 该节点的线程 * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * 指向下一个处于CONDITION状态的节点 * * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ AbstractQueuedSynchronizer.Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * 返回前驱节点,如果为null,抛出NPE * * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException { AbstractQueuedSynchronizer.Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, AbstractQueuedSynchronizer.Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }

 

同步状态state

    /**
     * The synchronization state.
     */
    private volatile int state;

 

访问state字段的几个方法:

这几个方法都是final修饰的,子类无法重写。

  /**
     * 获取state的值
     */
    protected final int getState() {
        return state;
    }

    /**
     * 设置state的值
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * CAS原子方式更新state
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

 

加锁过程

通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程):

 

 

AbstractQueuedSynchronizer(AQS)                                                                 AbstractQueuedSynchronizer(AQS)

 

 

 

 

自定义同步器可实现的方法

 AQS提供的几个可用于自定义同步器实现的protected修饰的方法(ReentrantLock即实现了以下方法):

    /**
     * 尝试获取锁以独占方式。
     * 这个方法应该先查询对象的状态是否允许在独占模式下获取它,如果允许,则可以获取它
     *
     * @param arg 获取锁的次数
     * @return 成功则返回true,失败则返回false
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 尝试释放锁以独占模式
     *
     * @param arg 释放锁的次数
     * @return 成功则返回true,失败则返回false
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 尝试获取锁以共享模式
     * 这个方法应该先查询对象的状态是否允许在共享模式下获取它,如果允许,则可以获取它
     * @param arg 获取锁的次数
     * @return 负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 尝试释放锁以共享模式
     *
     * @param arg 释放锁的次数
     * @return 如果释放后允许唤醒后续等待结点返回true,否则返回false
     * @throws UnsupportedOperationException if shared mode is not supported
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 该线程是否正在独占资源。只有用到Condition才需要去实现它
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。

AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

 

自定义一个简单的同步器

1、不支持可重入

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 基于AQS自实现一个锁,不支持可重入
 */
public class CustomLock {

    private final Sync sync;

    public CustomLock() {
        sync = new Sync();
    }

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (Thread.currentThread() == getExclusiveOwnerThread()) {
                setState(0);
                setExclusiveOwnerThread(null);
                return true;
            }
            return false;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    
    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }
    
}

2、支持可重入

/**
 * 基于AQS自实现一个锁,支持可重入
 */
public class CustomReetrantLock {

    private final Sync sync;

    public CustomReetrantLock() {
        sync = new Sync();
    }

    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 尝试获取锁
         *
         * @param acquires
         * @return
         */
        @Override
        protected boolean tryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取同步状态值
            int c = getState();
            // 如果同步状态为0,表示锁空闲,则获取锁并通过CAS修改同步状态
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    // 设置当前锁的持有者为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果锁不空闲,但是锁的持有线程就是当前线程,则重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                // overflow
                if (nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                // 同步状态累加
                setState(nextc);
                return true;
            }

            return false;
        }

        /**
         * 释放锁
         *
         * @param releases
         * @return
         */
        @Override
        protected boolean tryRelease(int releases) {
            // 获取释放锁后的同步状态值
            int c = getState() - releases;
            // 如果当前锁的持有者不是当前线程,则抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            // 锁是否空闲
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        /**
         * 判断锁是否为当前线程独占
         *
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    /**
     * 尝试以独占模式获取锁
     */
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 以独占模式释放锁
     */
    public void unlock() {
        sync.release(1);
    }

}

  测试:

public class CustomLockTest {

    private static final ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(
            5,
            10,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(1000),
            new MyThreadFactory("customLock"),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    r.run();
                }
            }
    );


    private static int count = 0;
    private static CustomReetrantLock customReetrantLock = new CustomReetrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 10000; i++) {
            THREAD_POOL.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 获取锁
                        customReetrantLock.lock();
                        count++;
                        // 重入再次获取锁
                        customReetrantLock.lock();
                        count++;
                        // 最后判断线程是否同步执行
                        if (count == 20000) {
                            System.out.println("done"); // 每次都会输出,说明线程是同步执行的
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 重入几次释放几次
                        customReetrantLock.unlock();
                        customReetrantLock.unlock();
                    }

                }
            });
        }
        
    }
}
View Code

相关文章: