What is the AQS

Concurrency enables a computer to make full use of its computing power to efficiently complete various program tasks. As you learn more about concurrency in Java, you will inevitably learn about locking — the means by which concurrent resources can be accessed correctly. Lock learning will also be divided into two parts, one is how to unlock, and the other is who to assign the lock to.

AQS (AbstractQueuedSynchronizer) is also called “abstract queue synchronizer”, it provides the “who” lock is assigned to a solution of this problem, make the lock of the developer can focus on “on” how to unlock, avoid is allocated in the lock and details of traps.

For example, JUC, such as CountDownLatch, Semaphore, ReentrantLock, ReentrantReadWriteLock and other concurrent tools, all use AQS to complete their lock allocation problems.

Cas-based status update

For AQS to assign the lock correctly to the requester, it needs other attributes to maintain the information, and then it faces concurrency problems of its own, because the information will be changed and can come from any thread.

AQS uses CAS (Compare and Set) to assist in updating the information it wants to maintain (subsequent source code is available everywhere). CAS means that an object is expected to have a value and set to a new value. So, if there is no expected value or the update fails, return false; Returns true if the desired value is set successfully. I think my house is open. I’m going to close it. Then the assertion is true only when the door is open and I close it.

CAS is an atomic operation guarantee provided at the hardware level, meaning that only one thread can access the objects on which CAS operates at any one time. Then, AQS uses CAS for the following reasons:

  1. CAS fast enough
  2. If the CAS fails during concurrency, it is possible to try again by spinning, because AQS knows that the time required to wait for concurrent operations to maintain information is very short
  3. AQS maintenance of information cannot cause other threads to block

Therefore, FOR all information updates required by AQS, CAS is used to assist the concurrency to be correct.

CLH queue

CLH queues take their name from the acronym Craig, Landin, and Hagersten, who proposed to build a FIFO(first-in, first-out) queue in concurrency by spin-locking. In AQS, a synchronization queue is also maintained to record the status of each thread applying for the lock.

Each recording unit is represented by the internal class Node of AQS:

static final class Node {
    // The thread cancels the lock request
    static final int CANCELLED =  1;
    // Indicates that the thread is applying for a lock, waiting to be allocated
    static final int SIGNAL    = -1;
    // indicates that the thread is waiting for some condition to complete before proceeding to the next phase
    static final int CONDITION = -2;
    // Indicates that operations performed on the current node are propagated to the queue
    static final int PROPAGATE = -3;
    // Indicates the status of the current thread
    volatile int waitStatus;
    
    // Points to the previous node, also known as the precursor node
    volatile Node prev;
    // Points to the next node, also known as the successor node
    volatile Node next;
     
    // The thread represented by the node
    volatile Thread thread;
     
     // points to the next node that represents the thread waiting for some condition to proceed to the next phase
    Node nextWaiter;
}
Copy the code

In the Node structure, the prev and next attributes will support the AQS to form a two-way queue of threads requesting locks, while the queue list, and the first-in, first-out feature, need to be supported by methods.

    private transient volatile Node head;
    private transient volatile Node tail;
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                // The CAS operation creates a head node
                // It doesn't matter if it fails. If it fails, it means that there is a concurrency and it goes to the else below
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // Add Node to the end of the Node, ensuring that it is added to the end and then retraced
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

In AQS, head is the head of the CLH queue, and tail is the tail of the CLH queue. When joining nodes, CAS and spin are used to ensure that nodes are joining the queue correctly.

The figure above illustrates the concurrency that can occur when a Node is inserted and how to resolve it. AQS supports both exclusive and shared locks, so CLH queues need to be able to distinguish node types. Regardless of the type of node, you can insert the node into the queue by addWaiter() instead of calling enq() directly

    static final class Node {
        // Indicates a shared lock node
        static final Node SHARED = new Node();
        // Indicates an exclusive lock on the node
        static final Node EXCLUSIVE = null;
    }

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // If the tail is inserted successfully, return directly
                pred.next = node;
                returnnode; }}// CAS spins to ensure team entry
        enq(node);
        return node;
    }
Copy the code

Node.waitstatus indicates the state of the Node, meaning that the state can be changed, so the Node in the CLH queue can also cancel the wait:

    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;

        Node pred = node.prev;
        // First, find the node in front of the current node that did not cancel the wait
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // Easy to operate
        Node predNext = pred.next;
        // Record the current node status as cancelled, so that if concurrency occurs, it can be processed correctly
        node.waitStatus = Node.CANCELLED;

        // If the current node is tail, use CAS to set tail to the uncancelled pred node found
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
                / / 1.
                Node next = node.next;
                if(next ! =null && next.waitStatus <= 0)
                    // Remove CANCELLED nodes and sort CLH queues
                    compareAndSetNext(pred, predNext, next);
            } else {
                // When the pred head node wakes up the next node
                unparkSuccessor(node);
            }
            node.next = node; // help GC}}Copy the code

For the entry of position ① in the code:

  1. Pred is not a head node
  2. The thread of the PRED record is not empty
  3. And pred are in SIGNAL state, waiting for the lock to be assigned
  4. If the preD status is less than 0, the CAS can be set to SIGNAL

CancelAcquire () arranges the CLH queue into a new state, completing the operation of removing nodes whose wait has been cancelled in the concurrent state

So, how does AQS CLH queue complete FIFO?

Resume and suspend

As mentioned above, AQS only solves the problem of lock allocation, and lock unlocking control is controlled by the subclass. For ease of reading, the method to be implemented by the subclass is first mentioned.

    public final void acquire(int arg) {
        // If the lock is obtained, the program to obtain the lock is executed
        // If the lock is not available, insert a Node representing the current thread into the queue and request the lock
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))/ / the interrupt
            selfInterrupt();
    }
Copy the code

In view of acquire(), the implementation method of exclusive lock request, tryAcquire() is the logic that subclasses implement to control whether the lock is acquired successfully or not. AddWaiter () adds the new lock Node representing the current thread to the CLH queue and then requests the lock.

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                / / spin
                // Read the precursor node, because the precursor node may have changed, such as cancel wait operation
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // Only if the precursor node is head is eligible for the lock
                    // Set head to the current node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // Returns whether an interrupt occurred
                    return interrupted;
                }
                // Update the current node state and check whether the thread has been interrupted
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if (failed)
            // An unexpected exception occurs. Remove the node to prevent other nodes from being affectedcancelAcquire(node); }}Copy the code

AcquireQueued () expresses the following logic:

  • You are only eligible to acquire locks if your precursor is head, which represents a FIFO.
  • Acquire () if the thread has been interrupted, it will return to the interrupted state.
  • By acquireQueued(), the process of requesting locks is fair, and locks are requested in queue order.
  • With acquire(), the process of requesting a lock is unfair because acquire() will try to acquire the lock before joining the queue, meaning that at some point, a thread will complete queue-jumping.

So shouldParkAfterFailedAcquire () is the Node status updates, parkAndCheckInterrupt will thread hangs, recovery after return whether the thread is interrupted.

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if(ws == node. SIGNAL) If the driver status is SIGNAL, this parameter is returnedreturn true;
        if (ws > 0) {
            CancelAcquire () consolidates nodes that were cancelled before cancelAcquire() was removed
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // CAS sets the status of the precursor node to SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        // Suspend the current thread
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

Then, the process of acquiring the lock is clear, and if you enter the acquireQueued() method, you can expect something like this:

Acquire the lock directly, and acquire() thread to continue.

In parkAndCheckInterrupt(), the thread is suspended if the Node’s precursor is not head and is locked. Wait until the thread replies from parkAndCheckInterrupt() and acquireQueued the spin logic of () again, where case one, case two, and case three may occur

Precursor for case 3: the Node Node has been cancelled, so through shouldParkAfterFailedAcquire CLH queue () integration, walk up to a situation.

Currently, nodes that have not acquired a lock are queued in the CLH queue, and their thread blocks at parkAndCheckInterrupt(), waiting to wake up, and then continuing to try to acquire the lock.

So, when do threads resume?

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            // CAS changes the node status to 0
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            // If the successor node of S is empty or the state is greater than 0
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)// Start with tail and find the node nearest head whose state is not 0
                if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
            // Wake up the thread recorded in the node
            LockSupport.unpark(s.thread);
    }
Copy the code

Thread wakening occurs when calls to the unparksucceed () are made when cancelAcquire() is cancelled on a request, or when locks are released. The unparksucceeded () will wake up the first Node record that should be woken up from the CLH team, after which the thread continues from parkAndCheckInterrupt().

The invocations of unparkprecursor () are also looked at here in terms of the lock releasing method of exclusive locks

    public final boolean release(int arg) {
        // Subclass implementation, try to unlock
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                // Release the lock and wake up the next thread
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

Other features

Acquire () and Release (), which are exclusive locks, illustrate how AQS allocates locks via CLH queues. In addition, AQS supports other features.

Interruptible Threads that request locks from AQS are interruptible, and the interrupted status of the restored thread is checked from parkAndCheckInterrupt() so that higher calls can decide what to do. In the case of acquire(), it brings the interrupted thread back to the interrupted state.

Reentrancy control Reentrancy control can be set via isHeldExclusively(), which in AQS is for shared lock services. Of course, it is also possible to implement reentrancy with setExclusiveOwnerThread() and getExclusiveOwnerThread() in a locking method such as subclass tryAcquire().

Controllable lock acquisition time The time for applying for a lock can also be controlled by setting the wake up time of the thread when the lock cannot be applied for. AQS provides other versions of the lock application method, the process is generally the same.

Concurrency control AQS provide the means to control concurrency through the property state, which can only be modified by atomic operations. When the subclass controls the unlocking operation, it can make a judgment by controlling state.

How is an exclusive lock implemented

In the previous article, the exclusive lock examples acquire() and Release () illustrate how AQS work. This is mainly for other supplements.

Features such as interruptible, controllable access to lock times provide different entry methods and implement different versions of acquireQueued() with only minor differences. The following is an example of obtaining a lock in an interrupt manner

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        ......
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    Unlike acquireQueued(), the exception is thrown up
                    throw newInterruptedException(); . }Copy the code

The method of obtaining lock association in interrupt mode is as follows:

  • acquireInterruptibly()
  • doAcquireInterruptibly()

The controllable acquisition lock time association method is as follows:

  • doAcquireNanos()
  • doAcquireSharedNanos()

How is shared lock implemented

The implementation of shared locks is more complex than the implementation of exclusive locks. From applying for locks

    private void doAcquireShared(int arg){...// The difference is this section compared to the exclusive lock
                if (p == head) {
                    // Try to acquire the lock, r represents the resource condition
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // Get the lock, reset the head, and propagate
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}... } privatevoid setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        // Reset head
        setHead(node);
       
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            / / 1.
            Node s = node.next;
            if (s == null || s.isShared())
                // Wake up the other nodesdoReleaseShared(); }}Copy the code

In the case of shared locks, it is also necessary to consider that more resources can support more concurrency after the lock is successfully applied. In this case, Node can be awakened.

Any of the following situations may occur when entering ① :

  • There are more resources, that is propagate > 0
  • The old head is empty or not cancelled
  • The new head is empty or not cancelled

The reason to call doReleaseShared() for old and new heads is that if the old head has already been released, it is possible to wake up one less Node without checking the status of the new head.

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) { int ws = h.waitStatus;if (ws == Node.SIGNAL) {
                    // Set the header to 0
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;
                    // Wake up the next node
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                    / / 2.! compareAndSetWaitStatus(h,0, Node.PROPAGATE))
                    continue;                
            }
            if (h == head)                   
                break; }}Copy the code

The unparksucceeded () will not be repeated. The subtlety lies in ②, as shown in the figure

The unparksucceeded () needs to wake up A node whose state is less than zero. Imagine A time when A and B were applying for A lock, C released the lock, A got the lock and the head state was set to zero. The time slice is allocated to D, and D releases the lock, but finds that the head state is 0, so it does not wake up. A gets the time slice, continues to call setHeadAndPropagate(), the propagate value is 0, and also does not wake up. And what we want is for B to wake up.

Without processing, the amount of concurrency is constantly reduced as the program runs. Then, setting the state of the head node at ② to PROPAGATE can avoid this problem. In the example above, A will wake UP B after entering setHeadAndPropagate(). So PROPAGATE means PROPAGATE an action.

Similar to exclusive locks, other features provide a corresponding entry, but the source code will not be released here:

  • acquireSharedInterruptibly()
  • doAcquireSharedInterruptibly()

The controllable acquisition lock time association method is as follows:

  • tryAcquireSharedNanos()
  • doAcquireSharedNanos()

Conditional processing

Unlike exclusive locks, shared locks need to support conditions, that is, sometimes conditions need to be met before a thread should continue running. Condition expresses this collaboration by providing template methods in which

  • Await () series: represents the completion of the await condition
  • Signal (), signalAll() : indicates that the condition is fulfilled

Condition semantics is realized by AQS ConditionObject

    public class ConditionObject implements Condition.java.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;
    }
    
    static final class Node {
        // The node that represents the next CONDITION state
        Node nextWaiter;
    }
Copy the code

ConditionObject maintains a one-way queue of nodes waiting for the Condition to complete.

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// If the tail nodes are no longer CONDITION, remove them
                unlinkCancelledWaiters();
                // redirect to the tail node
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                // as the head node
                firstWaiter = node;
            else
                // as the next node
                t.nextWaiter = node;
            // Update the tail node
            lastWaiter = node;
            return node;
        }
Copy the code
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while(t ! =null) {
                // Remove all nodes that are not node. CONDITION, starting from the beginning Node
                Node next = t.nextWaiter;
                if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                elsetrail = t; t = next; }}Copy the code

The operation of CONDITION node insertion is relatively simple, and the operation of removal is shown in the figure

Signal() -> doSignal() at any time if the condition is fulfilled

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // Remove the next record of the first node
                first.nextWaiter = null;
            } while(! transferForSignal(first)/* Join CLH queue */&& (first = firstWaiter) ! =null);
        }
        
        final boolean transferForSignal(Node node) {
        // Update the node status
        if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
            return false;
        // Add nodes to the CLH queue
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// Call up the thread if the wait is cancelled or the SIGNAL cannot be set
            LockSupport.unpark(node.thread);
        return true;
        }
Copy the code

When the Conditon condition is reached, the node is moved from the queue maintained by ConditionObject to the CLH queue so that it can be properly awakened when the resource becomes available. The suspension is located at:

        public final void await() throws InterruptedException {
            
            if (Thread.interrupted())
                // If the thread breaks, throw an exception
                throw new InterruptedException();
            // Join the CONDITION queue
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            // Record the interrupt scenario
            int interruptMode = 0;
            while(! isOnSyncQueue(node)) {/ / spin
                // Suspends the thread if it is not added to the CLH queue
                LockSupport.park(this);
                // Update the interrupt scenario
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            // Try to acquire the lock. Node is already in the CLH queue
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                // Do different processing according to the interrupt scenario
                reportInterruptAfterWait(interruptMode);
        }
Copy the code

Therefore, when the Condition is reached, the awakened thread will be removed from the while (! isOnSyncQueue(node)){… } continue in spin, roughly speaking, the process is:

How do I use AQS

AQS solves the allocation process of locks, and the process of unlocking needs to be realized by subclasses themselves. Subclasses can provide an implementation of an exclusive or shared lock as required.

  • TryAcquire (int) : Obtain an exclusive lock
  • TryRelease (int) : Releases the exclusive lock
  • TryAcquireShared (int) : Obtains the shared lock
  • TryReleaseShared (int) : Releases the shared lock

Generally speaking, the int parameter is used to assist in the control of the state property of AQS. That is to say, the concurrent state can be guaranteed by ensuring that the state of state is changed to atomic operation. AQS also provides the CAS operation of compareAndSetState() to change state.

A simple example is

    public static class MySync extends AbstractQueuedSynchronizer{
       
        public void lock(){
            acquire(0);
        }
        public void unlock(){
            release(0);
        }

        @Override
        protected boolean tryAcquire(int arg) {
            return compareAndSetState(0.1);
        }
        @Override
        protected boolean tryRelease(int arg) {
            return compareAndSetState(1.0); }}Copy the code

In this example, an exclusive lock is implemented, in which acquire() and release() arguments are meaningless because it is only a single concurrency, so the add unlock is done directly by the success or failure of compareAndSetState().

conclusion

AQS is to solve the problem of lock allocation in the concurrent process, so that the implementor of the lock can focus on the implementation of unlock. The implementation of AQS is summarized as follows:

  • Maintain a CLH queue to record every thread that needs to acquire locks; [Fixed] The lock is not fair on the first request. The lock request in the queue is fair.
  • When the thread represented by the Node lock does not request the lock, it will be suspended. After being awakened, it will try to request the lock again. If the lock is still not obtained, the process is repeated.
  • When a Node is queued, the unqueued Node is removed from the end of the queue until the first unqueued Node is found and inserted.
  • When the lock is released, start at the head of the CLH queue, find the first uncancelled node, and wake up.
  • For shared locks, if a condition is required to wait, the Node enters a single queue, spins, and suspends. When the condition is met, add Node to CLH team and request lock; If a lock is requested, continue the thread of execution.

In addition, AQS also supports the following features

  • Concurrency with CAS and spin control of its own state, fast enough
  • Reentrant judgments are supported by controlling isHeldExclusively(), whose code is scattered throughout the CONDITION node, so it is not released. Reentrancy can be implemented in the locking methods of subclasses such as tryAcquire() using setExclusiveOwnerThread() and getExclusiveOwnerThread() together
  • Support interrupts.
  • Support lock acquisition time control.

The above understanding of AQS, wrong place, please kindly comment.

Advanced learning

ReentranLock

ReentrantReadWriteLock

Learn Semaphore

reference

AQS