This is the first day of my participation in Gwen Challenge

1. What is AQS?

AQS is a framework for implementing blocking locks and synchronizers based on first-in, first-out (FIFO) wait queues. AQS holds the state of the lock via a volatile int state variable. Subclasses must pass:

  • getState(): Gets the current synchronization status
  • setState(int newState): Sets the current synchronization status
  • compareAndSetState(int expect,int update)Use:CASSet the current state. This method ensures atomicity of state Settings.

Three methods modify the state value of the lock.

Compare and swap (CAS) : the memory value is compared with the expected value. If the memory value is equal to the expected value, the new value is replaced with the memory value. If true is returned, the operation succeeds. If not, return false to indicate that the operation failed. Most CAS operations are implemented by CPU primitives. CAS operations are often used to implement lock-free data structures, and there are many such data structures in the java.util.concurrent package: ConcurrentLinkedQueue, ConcurrentLinedDeque, ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet.

1.1 Types of locks supported by AQS

AQS supports exclusive locks and shared locks.

  • Exclusive lock: A lock can only be owned by one thread at a time. According to the acquisition mechanism of lock, it can be divided into “fair lock” and “unfair lock”. Wait in the queue according toFIFOThe longer the waiting time, the sooner the thread obtains the lock, which is fair lock acquisition, that is, fair lock. Instead of a fair lock, the thread acquires the lock regardless of the waiting queue.ReentrantLockReentrantReadWriteLock.WritelockAn exclusive lock.
  • Shared lock: a lock that can be acquired by multiple threads at the same time.JUCIn the packageReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatchSemaphoreAll shared locks.

1.2 based onAQSImplement the lock

AQS does not implement any synchronization interface, so general subclasses implement locking mechanism by inheriting the form of AQS inner class. In general, the synchronizer is implemented by inheriting the AQS class, using getState, setState, compareAndSetState to monitor the state, and overriding the following methods:

  • tryAcquire(): Exclusive mode. Attempts to obtain the resource are returned on successtrueOn failure, returnfalse.
  • tryRelease(): Exclusive mode. Attempts to release resources, returns on successtrueOn failure, returnfalse.
  • tryAcquireShared(): Share mode. Attempt to obtain resources. Negative numbers indicate failure;0Indicates success, but there are no available resources. A positive number indicates success and free resources.
  • tryReleaseShared(): Share mode. Try to release the resource, and if so allow wake up to wait for the node to returntrueOtherwise returnfalse.
  • isHeldExclusively(): Whether the thread is monopolizing resources. Only useconditionYou need to make it happen.

In general, custom synchronizers are either exclusive or shared methods, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. However, AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock.

2. AQS source code analysis

2.1 Storage Node Node

We have been saying that AQS is a storage structure based on FIFO queues, which is stored in the form of internal class Nodes. The wait queue is the CLH synchronization queue.

static final class Node {
    /** Nodes in shared node mode */
    static final Node SHARED = new Node();
    /** Nodes in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** Cancel the status */
    static final int CANCELLED =  1;
    /** The thread of the successor node is in the wait state, and the thread of the current node will be notified if the synchronization state is released or cancelled, so that the thread of the successor node can run */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /** * The next shared synchronous state acquisition will be propagated unconditionally */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /** ** Precursor node */
    volatile Node prev;

    /** * Rear-drive node */
    volatile Node next;

    /** * The thread that gets the synchronization status */
    volatile Thread thread;

    /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */
    Node nextWaiter;

    /** * Returns true if node is waiting in shared mode. */
    final boolean isShared(a) {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor(a) throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread; }}Copy the code

In the Node inner class, pre and Next nodes are declared for queue connections and waitStatus is saved.

2.2 AQS Class Analysis

In the object-oriented world, to understand what makes a class special, look at its properties. By looking at the source code, we can see that the AQS class contains:

/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */
private transient volatile Node head;

/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */
private transient volatile Node tail;

/** * The synchronization state. */
private volatile int state;
Copy the code
  • Precursor head node –head
  • Rear-drive tail node –tail
  • Synchronizer status –state

AQS is based on FIFO queues, then in accordance with the order of acquire-Release, Acquireshared-releaseshared to analyze the queue and queue.

2.2.1. Acquire (int)

This method is the top-level entry for threads in exclusive mode to obtain shared resources. If the resource is retrieved successfully, the thread returns directly. Otherwise, the thread is queued until the resource is retrieved, ignoring the interrupt. This is the semantics of lock(), but not just lock(). After obtaining the resource, the thread can execute its critical section code. Here is acquire source code:

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The tryAcquire(), acquireQueued(), and addWaiter() methods are called in the acquire method.

First try to obtain an exclusive lock using the tryAcquire method. Return true on success, false otherwise.

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

Throw an exception directly in the source code. Combining our previous knowledge of custom locks, AQS is just a framework, and specific resource acquisition and release methods are implemented by custom synchronizer. AQS only defines one interface, the specific resource acquisition by custom synchronizer to achieve (through state get/set/CAS)!! As for reentrant and plug, it depends on the design of the custom synchronizer. Of course, custom synchronizers have thread-safety implications when accessing resources.

This is not defined abstract because in exclusive mode only tryacquire-TryRelease is implemented and in shared mode only tryAcquireshared-TryReleaseshared is implemented. If both are defined as abstract, then each schema also implements interfaces in the other schema. At the end of the day, Doug Lea is on our side, trying to minimize unnecessary work.

1. addWaiter()

This is followed by the addWaiter() method, which adds the current thread to the end of the wait queue and returns the node where the current thread resides.

private Node addWaiter(Node mode) {
	// Build the current thread's Node with the given Node mode, and pass in the acquire method an EXCLUSIVE Node
    Node node = new Node(Thread.currentThread(), mode);
    // Save the last node
    Node pred = tail;
    if(pred ! =null) {// If the tail node is not null
		// Set the tail node to the prev node of the new tail node
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {// CAS guarantees that nodes can be added safely by threads
			// Define the current next node that points to the precursor
            pred.next = node;
            returnnode; }}// If the tail node is null, enq is used to join the queue
    enq(node);
    return node;
}

// The synchronizer uses an infinite loop to ensure that the node is added correctly. After setting the node as a tail node through CAS in the "infinite loop",
// Only the current thread can return from this method, otherwise the current thread keeps trying to set.
private Node enq(final Node node) {
	//CAS" spins "until it successfully joins the back of the queue
    for (;;) {
		// Tail node temporary storage
        Node t = tail;
        if (t == null) { // Must initialize
			// If tail is null, the
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

In the addWaiter(Node Node) method, add the current thread Node to the wait queue.

2. acquireQueued()

acquireQueuedThe thread in the queue acquires the lock

/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@codeTrue} if interrupted while waiting * * acquireQueued method The current thread gets the synchronization status in an infinite loop, Only the head node can attempt to acquire synchronization status (lock) (p == head && tryAcquire(ARG)) * because :1. The head node is the node that has successfully acquired the synchronization state (lock), and the thread of the head node will wake up the subsequent node after releasing the synchronization state. The thread of the subsequent node will wake up and check whether its precursor node is the head node. * 2. Maintain the FIFO principle of the synchronization queue. After the node enters the synchronization queue, it enters a spin process, and each node (or each thread) is observing introspection. * * /
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // The lock can be acquired only if the current node's precursor is a head node
        for (;;) {
            // Get the precursor node of the node
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// The thread in the node loops to check whether its precursor node is the head node
                // Set the current node as the head node, remove the previous head node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // Otherwise check the state of the previous node to see if the thread that failed to acquire the lock is suspended
            if (shouldParkAfterFailedAcquire(p, node) &&
                // If you need to suspend, use the static method park of the LockSupport class below the JUC package to suspend the current thread until it is awakened
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        // If there is an exception
        if (failed)
            // Cancel the request to remove the current node from the queuecancelAcquire(node); }}Copy the code

After being added to the wait queue via the addWaiter method, the lock is acquired via the acquireQueued method.

The general process is as follows:

  • tryAcquire()Attempt to retrieve the resource directly and return it if successful;
  • addWaiter()Add the thread to the end of the wait queue and mark it as exclusive mode;
  • acquireQueued()Causes a thread to wait in a queue for a resource and return only after the resource has been obtained. Returns if it has been interrupted during the entire waittrueOtherwise returnfalse.
  • If a thread has been interrupted while waiting, it does not respond. Self-interrupt only after obtaining resourcesselfInterrupt(), will interrupt to fill.
3. Obtaining an exclusive lock

Call the acquire(int ARg) method of the synchronizer to obtain the synchronization state. This method is interrupt insensitive, that is, a thread fails to obtain the synchronization state and enters the synchronization queue. The thread will not be removed from the synchronization queue when it is interrupted. Acquisition process:

  1. Current thread passestryAcquire()Method attempts to obtain the lock. If it succeeds, it returns directly. If it fails, it enters the queue and waitsCASGet synchronization status.
  2. If attempts to acquire locks fail, construct synchronous nodes (exclusive)Node.EXCLUSIVE),addWaiter(Node node,int args)Method to add the node to the end of the synchronization queue.
  3. The last callacquireQueued(final Node node, int args)Method so that the node gets the synchronization state in an infinite loop, if not, the thread in the node is blocked.acquireQueuedMethod The current thread obtains the synchronization state in an infinite loop, and can only attempt to acquire the lock (synchronization state) if the precursor node is the head node (p == head && tryAcquire(arg)).

2.2.2 release(int) Release the exclusive lock

The lock is released in AQS via the release method.

public final boolean release(int arg) {
   // Call the tryRelease method
    if (tryRelease(arg)) {// If the release succeeds
        Node h = head;
        // If the head node is not null and its waitStatus value is not 0, it is stateful
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
   // Return false without successful release
    return false;
}

// tryRelease() attempts to release the synchronization state of the current thread (lock)
protected final boolean tryRelease(int releases) {
        //c indicates the synchronization status after release
      int c = getState() - releases;
      // Check whether the thread currently releasing the lock is the one that acquired the lock (synchronous state), not the one that threw an exception (illegal monitor state exception)
      if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
      boolean free = false;
      // If the lock (synchronization state) has been completely released by the current thread, set the lock holder to NULL, and the synchronization state (lock) becomes available
      if (c == 0) {
          free = true;
          setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
  }

private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    Node s = node.next;
    // Start at the end of the queue and find the first node whose waitStatus is less than 0.
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    // Wake up the thread corresponding to the subsequent node
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

Release () is the top-level entry for threads in exclusive mode to release shared resources. It frees a specified amount of resources, and if it frees completely (i.e., state=0), it wakes up other threads in the waiting queue to acquire resources.

2.2.3 acquireShared (int)

This method is the top-level entry for threads in shared mode to obtain shared resources. It will obtain a specified amount of resources, the success of the return, the failure to get into the waiting queue, until the resource, the whole process ignores interruption. AcquireShared () ¶

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);// Join the end of the queue
    boolean failed = true;// Whether the success flag
    try {
        boolean interrupted = false;// Indicates whether the waiting process has been interrupted
        for (;;) {
            final Node p = node.predecessor();/ / precursor
            if (p == head) {// If the thread is next to the head, node will wake up because the head is the thread that gets the resource
                int r = tryAcquireShared(arg);// Try to obtain the resource
                if (r >= 0) {/ / success
                    setHeadAndPropagate(node, r);// If you point the head at yourself, there are resources left to wake up subsequent threads
                    p.next = null; // help GC
                    if (interrupted)// If the wait is interrupted, the interrupt will be replaced.
                        selfInterrupt();
                    failed = false;
                    return; }}// Enter a waiting state, waiting for unpark() or interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Here tryAcquireShared() still needs a custom synchronizer. But AQS has already defined the semantics of its return value: negative values mean fetch failure; 0 indicates that the command is successfully obtained but has no remaining resources. A positive number indicates that the resource was acquired successfully and that there are resources left for other threads to acquire. So the process here for acquireShared() is:

  1. tryAcquireShared()Attempts to obtain resources are returned if successful.
  2. Fail and passdoAcquireShared()Enter a waiting queuepark()Until it isunpark()/interrupt()And returns after successfully obtaining the resource. The entire wait process is also interrupt-oblivious.

DoAcquireShared (int) This method is used to put the current thread at the end of the queue and rest until other threads wake it up by releasing resources.

2.2.4 releaseShared ()

ReleaseShared () is a top-level entry for threads to releaseShared resources in shared mode. It frees a specified amount of resources, and if it successfully frees and allows the waiting thread to wake up, it wakes up other threads in the waiting queue to acquire resources.

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {// Try to release resources
        doReleaseShared();// Wake up the successor node
        return true;
    }
    return false;
}
Copy the code

The process of this method is also relatively simple, in a word: after releasing the resources, wake up the successors. Similar to release() in exclusive mode, but with a slight note: Exclusive tryRelease() does not return true to wake up other threads until it has completely freed resources (state=0), mainly due to reentrant considerations. ReleaseShared () in shared mode has no such requirement. In shared mode, a certain number of threads are controlled to execute concurrently, so that the thread with the resource can wake up the waiting node when it releases some resources. For example, if the total amount of resources is 13, A (5) and B (7) respectively obtain resources and run concurrently. When C (4) only has one resource, it needs to wait. TryReleaseShared (2) returns true to wake up C. C sees only 3 resources, but it is still not enough to wait. TryReleaseShared (2) returns true to wake C up, and C is ready to run with A and B. TryReleaseShared () of a ReentrantReadWriteLock read lock returns true only when the resource is completely freed (state=0), so custom synchronizer can determine the return value of tryReleaseShared() as needed.

3. Summary

In this section, A simple study of AQS is carried out, and the following articles are referred to in the summary, which is recommended for everyone to read.

  • JUC review -AQS synchronizer implementation principle
  • JDK source code AQS source analysis
  • AQS for Java concurrency