AQS was designed for

Doug Lea has talked about how AQS was designed.

Doug Lea is the developer of the Java Development Kit JDK5.

In principle, a synchronous structure is often can take advantage of the structure of the other, however, the tendency of a synchronous structure, leads to the realization of the complex and obscure logic, so, he chose to base synchronization related operations of the abstract in AbstractQueuedSynchronizer, AQS provides a model for constructing synchronous structure.

AQS source code (JDK8)

The four core

state

A volatile integer member represents state and provides both setState and getState methods

    /** * The synchronization state. */
    private volatile int state;
Copy the code

The queue

A first in, first out (FIFO) queue of waiting threads to achieve inter-threaded competition and waiting, which is one of the core of the AQS mechanism.


    /** * Wait queue head, delay initialization. * Except for initialization, it is only modified through the method setHead. * Note: If head exists, waitStatus is not CANCELLED */
    private transient volatile Node head;

    /** * Wait for the end of the queue, delay initialization. * Add a new wait node only through method enq modification. * /
    private transient volatile Node tail;
Copy the code

From the source can be seen, AQS through double linked list to achieve FIFO queue

CAS

Various basic operations based on CAS


    /** * If the current status value is equal to the expected value, the synchronization status is automatically set to the given update value. * This operation has the memory semantics of volatile reads and writes. * Parameters: expect -- expected * update -- new value * Return: true if successful. A false return indicates that the actual value does not equal the expected value. * /
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /** * CAS head field. Used only by enq. */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /** * CAS tail field. Used only by enq. */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /** * CAS waitStatus field of a node. */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /** * CAS next field of a node. */
    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
Copy the code

As can be seen from the source code, CAS operates on state or double-linked list nodes.

acquire/release

Various acquire/release methods that specific synchronization structures are expected to implement


    /** * get in exclusive mode, ignore interrupts. * This is done by calling tryAcquire at least once and returning on success. Otherwise the thread will * queue and may repeatedly block and unblock, calling tryAcquire until it succeeds. * This method can be used to implement the lock. Lock method. * Parameter: arg -- acquire parameter. This value is passed to tryAcquire, but otherwise uninterpreted, and can * represent anything you like. * /
    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }/** * to be released in exclusive mode. If tryRelease returns true, this is done by unblocking one or more threads. This method can * be used to implement the method lock. unlock. * Parameters: ARG - Release parameters. This value is passed to tryRelease, but is otherwise unsurprising and can represent anything * you like. * Return: the value returned from tryRelease */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

Using AQS to realize a synchronous structure, at least two basic types of methods should be realized, namely acquire operation, acquire exclusive rights of resources; There is also the release operation, which releases the exclusive possession of a resource.

ReentrantLock

ReentrantLock, for example, internally implements Sync type by extending AQS to reflect lock holding status with AQS state.

    /** * This lock is the basis of synchronous control. Divided into fair and unfair versions below. * Use the state of AQS to indicate the number of reservations on the lock. * /
    abstract static class Sync extends AbstractQueuedSynchronizer {... }Copy the code

Below are the ReentrantLock operations for acquire and release.

lock


    /** * get the lock. * If the lock is not held by another thread, the lock is acquired and returned immediately, setting the lock holding count to 1. * If the current thread already holds the lock, the hold count is increased by 1 and the method returns immediately. * If the lock is held by another thread, the current thread is disabled for thread scheduling purposes and sleeps until the lock is acquired, at which point the lock hold count is set to 1. * /
    public void lock(a) {
        sync.lock();
    }
Copy the code

The lock of ReentrantLock internally calls Sync’s lock method


    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock(a);
Copy the code

Sync is an abstract class, and lock is implemented by two of its subclasses, FaireSync and NonFairSync

FaireSync.lock


    final void lock(a) {
          acquire(1);
      }
Copy the code

NonFairSync.lock

    /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
    final void lock(a) {
        if (compareAndSetState(0.1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

Copy the code

In ReentrantLock, the tryAcquire logic implementation in NonfairSync and FairSync provides further non-fair or fair methods, respectively, while AQS internal tryAcquire is only a near-unimplemented method (directly throwing exceptions), This is left to the implementer to define

FaireSync.tryAcquire


    /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false; }}Copy the code

NonFaireSync.tryAcquire

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }


    /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
Copy the code

Non-fair tryAcquire internally implements how to cooperate with the state and CAS to acquire the lock. Note that in contrast to the fair version of tryAcquire, it does not check whether there are other waiting persons when the lock is unoccitated, which reflects the semantics of non-fairness.

acquireQueued


    /** * threads already in the queue are acquired in exclusive uninterrupted mode. * For conditional wait methods and fetches. * Parameters: node -- node * arg -- acquire parameter * Returns: true */ if interrupted while waiting
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

If the previous tryAcquire fails, it means that the lock scramble has failed and the queue competition is entered. Here is what we say, the use of FIFO queues, to achieve the thread of the lock competition, is the core logic of AQS. The current thread is wrapped as an EXCLUSIVE node, which is added to the queue using the addWaiter method. AcquireQueued logic, in a nutshell, is that if the current node is preceded by a head node, the lock will be attempted, and if all goes well, the new head node will be acquired. Otherwise, wait if necessary.

TryAcquire is the part that needs to be implemented by the developer according to the specific scenario, and contention between threads is provided by AQS via the Waiter queue and acquireQueued. In the release method, The same goes for the queue.

unlock

	/** * Attempts to release the lock. * If the current thread is the holder of this lock, the hold count is reduced. * If the hold count is now zero, the lock is released. * if the current thread is not the holder of the lock, it throws IllegalMonitorStateException. * throw: IllegalMonitorStateException if the current thread holds the lock * /
    public void unlock(a) {
        sync.release(1);
    }

 	/** * to be released in exclusive mode. * If tryRelease returns true, this is done by unblocking one or more threads. * This method can be used to implement the method lock. unlock. * Parameters: ARG - Release parameters. This value is passed to tryRelease, but is otherwise unsurprising and can represent anything * you like. * Return: the value returned from tryRelease */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


	protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
Copy the code