AQS, full name: AbstractQueuedSynchronizer, is to provide a synchronous frame JDK, internal maintains a bi-directional FIFO queue, the queue CLH synchronization.

AQS relies on it for synchronization state management (voliate modified state, used to indicate whether a lock is held). If the failure to obtain the synchronization state, the current thread and waiting information will be constructed into a Node, the Node into the FIFO queue, while blocking the current thread, when the thread will release the synchronization state, the FIRST section of the FIFO queue will wake up, so that it can obtain the synchronization state.

Many locks under JUC packages are implemented based on AQS, as shown in the following brain diagram:

Node

static final class Node {
        /** Shared node */
        static final Node SHARED = new Node();

        /** exclusive node */
        static final Node EXCLUSIVE = null;

        /** The node will be set to the cancelled state because of timeout or interruption. The cancelled node will not participate in the competition and will remain cancelled
        static final int CANCELLED =  1;

        /** The successor node is in the wait state. If the current node releases synchronization or is cancelled, the successor node is notified so that it can run */
        static final int SIGNAL    = -1;

        /** The thread of the node waits on condition. When another thread calls signal on condition, the node will enter the synchronization queue from the waiting queue to obtain synchronization status */
        static final int CONDITION = -2;

        /** * The next shared synchronization state acquisition is propagated unconditionally */
        static final int PROPAGATE = -3;

        /** Wait status */
        volatile int waitStatus;

        /** The precursor node */        
        volatile Node prev;

        /** The successor node */
        volatile Node next;

        /** The thread that gets the synchronization status */
        volatile Thread thread;

        /** * The next conditional queue waits for the node */
        Node nextWaiter;

        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }

        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code

FIFO structure

Exclusive synchronization state process

/** * get synchronization status exclusively */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}
Copy the code

tryAcquire

Attempt to acquire the lock, return true on success, false otherwise. This method is implemented by subclasses that inherit from AQS themselves. The design pattern of template method is adopted.

For example, ReentrantLock’s Sync inner class, Sync subclasses: NonfairSync and FairSync

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

addWaiter

    private Node addWaiter(Node mode) {
        // Create a Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try adding a tail quickly
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            // CAS sets the tail
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                returnnode; }}// If the above fails to be added, the loop attempts to add until it succeeds
        enq(node);
        return node;
    }
Copy the code

enq

    private Node enq(final Node node) {
        // Continue the for loop until Node is inserted successfully
        for (;;) {
            Node t = tail;
            if (t == null) { 
                // CAS sets the first node
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // CAS sets the tail
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        // Indicates whether the operation succeeded
        boolean failed = true;
        try {
            // Thread interrupt flag
            boolean interrupted = false;
            // Continuous spin loop
            for (;;) {
                // Prev of the current node
                final Node p = node.predecessor();
                // Check whether prev is a header && is in sync state
                if (p == head && tryAcquire(arg)) {
                    // Set the current node as a header
                    setHead(node);
                    // Remove the prev node from the queue
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // During the spin process, determine whether the current thread needs to block && block the current thread and check the thread interrupt status
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if (failed)
                // Cancel obtaining the synchronization statuscancelAcquire(node); }}Copy the code

shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // Get the status of the prev node of the current node
        int ws = pred.waitStatus;
        
        if (ws == Node.SIGNAL)
            /* * If the prev's status is signal, the current node is notified when the prev releases synchronization or cancels, so the current node can safely block (rather than sleep) */
            return true;
        if (ws > 0) {
            /* * status > 0, which indicates that the cancelled node is removed from the queue * until a non-cancelled node is found */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * Set prev status to signal */ via CAS
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code

parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt(a) {
        // Block the current thread
        LockSupport.park(this);
        // Returns the interrupted status of the current thread
        return Thread.interrupted();
    }
Copy the code

selfInterrupt

    static void selfInterrupt(a) {
        // Failed to get synchronization state && Thread interrupt state is true, interrupt current thread
        Thread.currentThread().interrupt();
    }
Copy the code

Release the exclusive synchronization state

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                // If the header is not null && and the header state is not 0
                // Wake up the successor node of the header
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

tryRelease

Attempts to release synchronization, true on success, false otherwise. This method is implemented by subclasses that inherit from AQS themselves. The design pattern of template method is adopted.

For example, the tryRelease method of ReentrantLock’s Sync inner class.

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

unparkSuccessor

private void unparkSuccessor(Node node) {
        /* * Get the current node status */
        int ws = node.waitStatus;

        // If the state of the current node is less than 0, use CAS to set it to 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Gets the successor of the current node */
        Node s = node.next;
        
        / / if the subsequent node is empty | | or subsequent node status > 0 (for cancellation status)
        if (s == null || s.waitStatus > 0) {
            s = null;
            // Find available nodes whose state is not canceled from the tail node
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
            // Wake up the successor node
            LockSupport.unpark(s.thread);
    }
Copy the code

conclusion

A FIFO synchronization queue is maintained in AQS. When the thread fails to obtain the synchronization state, it will join the opposite end of the CLH synchronization queue and keep the spin all the time. The thread in the CLH synchronization queue will determine whether its precursor node is the leading node when spinning. If the leading node tries to obtain the synchronization status continuously, it will exit the CLH synchronization queue if it succeeds in obtaining the synchronization status. When the thread finishes executing the logic, it releases the synchronization state and wakes up subsequent nodes.

Shared state synchronization process

The main difference between the shared mode and the exclusive mode is that only one thread can acquire the synchronization state at the same time, while the shared mode can have multiple threads acquire the synchronization state at the same time. For example, read operations can be performed by multiple threads at the same time, while write operations can be performed by only one thread at a time. All other operations will be blocked.

AcquireShared Gets the synchronization state

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            // Failed to obtain synchronization status
            doAcquireShared(arg);
    }
Copy the code

tryAcquireShared

Attempt to acquire the shared lock, return true on success, false otherwise. This method is implemented by subclasses that inherit from AQS themselves. The design pattern of template method is adopted.

For example, ReentrantReadWriteLock’s Sync internal class

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
Copy the code

doAcquireShared

private void doAcquireShared(int arg) {
        // Add shared mode nodes to the queue
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            // Spin to get synchronization status
            for (;;) {
                // The precursor of the current node
                final Node p = node.predecessor();
                // If the precursor node is the head node
                if (p == head) {
                    // Try to get the shared synchronization status
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // Set the current node as the head node and release the successor node that is also in shared mode
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // Actually release the shared synchronization state and wake up the next nodedoReleaseShared(); }}Copy the code

 doReleaseShared

private void doReleaseShared(a) {
        // Spin releases shared synchronization state
        for (;;) {
            Node h = head;
            // If the header is not null && the header is not equal to the tail, there is a valid node
            if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                // If the status of the header is signal, there is a successor node that needs to be woken up
                if (ws == Node.SIGNAL) {
                    // Update the state of the header to 0 (the initial state), because the header is now useless
                    // continue To ensure that the replacement succeeds
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    // Wake up the successor node
                    unparkSuccessor(h);
                }
                // If the state is the initial state 0, then set to PROPAGATE state
                // Ensure that subsequent nodes are notified when synchronization state is released
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break; }}Copy the code

unparkSuccessor

/** * wakes up the successor node */
private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            // If the current node state < 0, set the state to 0, the current node is useless
            compareAndSetWaitStatus(node, ws, 0);

        // Get the successor node of the current node
        Node s = node.next;
        / / if there is no subsequent nodes | | state of subsequent node > 0 (cancelled)
        // Find the last valid node from the tail node
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        // Wake up the node if it is not null
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

If you have any IT questions, you can send me a private message in the wechat public account and I will answer IT. You can follow IT by scanning the code