concept

JUC is a heavyweight infrastructure used to build locks and other synchronizer components and the cornerstone of the entire JUC architecture. The resource acquisition thread is queued through the built-in FIFO queue, and the state of the lock is indicated by a volatile int state.

Abstract queue type synchronizer, AQS defines a set of synchronizer framework for multithreaded access to a Shared resource, many synchronization class implements are dependent on it, such as commonly used already, Semaphore, CountDownLatch, ReentrantReadWriteLock…

Lock, for the user of the lock

Defines the programmer and lock interaction usage layer API, hidden implementation details, just call.

Synchronizer, lock-oriented implementer

With the synchronizer, we can implement different locks.

Why do YOU need AQS

Locking will lead to blocking, and blocking will require queuing, which will inevitably require some form of queue management.

The thread that grabs the resource directly uses the processing business logic, while the thread that does not grab the resource inevitably involves a queuing mechanism. Preemption resources failed thread to continue to wait (similar to bank business is dealt with Windows are full, temporarily not to accept the window of the customers have to wait to queue waiting guest area), but waiting for the thread still retains the possibility of acquiring a lock and acquiring a lock process continues (wait the customer is waiting for your turn, turn to acceptance window to handle business)

Speaking of queueing, there must be some sort of queue. What is the data structure of that queue?

If shared resources are occupied, some blocking wake-up mechanism is required to ensure lock allocation. This mechanism is implemented primarily with a variant of CLH queues, where threads that temporarily cannot acquire locks are added to a queue, which is an abstract representation of AQS. It encapsulates the thread requesting shared resources into a queue Node, maintains the state of the state variable through CAS, spin and locksupport.park (), and achieves synchronous control effect of concurrency.

CLH: Craig, Landin and Hagersten queues are unidirectional linked lists. Queues in AQS are virtual bidirectional queues (FIFO) of CLH variants. AQS is to achieve lock allocation by encapsulating each thread requesting shared resources into a node.

AQS

  • Synchronization status State member variable
  • Virtual bidirectional queues for CLH variants

State

  • Zero means no one can do free state.
  • Greater than or equal to 1, someone occupies the window, need to queue

conclusion

  1. There will be blocking lock, blocking will need to queue, queue is necessary to achieve queuing
  2. AQS: bidirectional queue of state variable +CLH variants

The framework

First of all, let’s have an overall understanding of the AQS framework through the following architecture diagram:

  • The one with the color in the figure is Method and the one without the color is Attribution.
  • Generally speaking, the AQS framework is divided into five layers, from shallow to deep from top to bottom, from API exposed by AQS to underlying basic data.
  • When a custom synchronizer comes in, you only need to rewrite some of the methods required by the first layer, without worrying about the underlying implementation process. When the custom synchronizer to lock or unlock operation, after the first layer of API into AQS internal method first, then after the second lock acquisition, and then failed to get the lock for the process, into the third and fourth layer waiting queue processing, and these are handled are dependent on the fifth floor provides the basis of the data layer.

The data structure

Let’s take a look at the most basic data structure in AQS — Node, which is the Node in the CLH variant queue above.

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */
        volatile Node prev;

        /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */
        volatile Node next;

        /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */
        volatile Thread thread;

        /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */
        Node nextWaiter;

        /** * Returns true if node is waiting in shared mode. */
        final boolean isShared(a) {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor(a) throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread; }}Copy the code

Explain the meaning of several methods and attribute values:

Method and property values meaning
waitStatus Status of the current node in the queue
thread Represents the thread at the node
prev Precursor pointer
predecessor Returns the precursor node, if not, throws a null pointer exception
nextWaiter Points to the next node in the CONDITION state. (Since this article does not cover the CONDITION Queue, this pointer is not covered.)
next Pointer to the subsequent

There are two modes of thread locking:

model meaning
SHARED Represents threads waiting for locks in shared mode
EXCLUSIVE Indicates that the thread is waiting for the lock in an exclusive manner

WaitStatus has the following enumerated values:

The enumeration meaning
0 The default value when a Node is initialized
CANCELLED A value of 1 indicates that the thread’s lock request has been canceled
CONDITION Is -2, indicating that the node is in the wait queue and the node thread is waiting to wake up
PROPAGATE If the value is -3, this field is used only when the current thread is SHARED
SIGNAL A value of -1 indicates that the thread is ready for resource release

Synchronization status State

Now that you know the data structure, let’s look at the AQS synchronization State — State. AQS maintains a field named state, which means synchronization state, and is modified by Volatile to show the current lock status of critical resources.

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;
Copy the code

There are several ways to access this field:

The method name describe
protected final int getState() Gets the value of State
protected final void setState(int newState) Set the value of State
protected final boolean compareAndSetState(int expect, int update) Update the State in CAS mode

These methods are Final, which means they cannot be overridden in subclasses. We can implement the exclusive mode and shared mode (locking process) of multiple threads by modifying the synchronization State represented by the State field.

For our custom synchronization tool, we need to customize the way to get synchronization state and release state, which is the first layer in the AQS architecture diagram: API layer.

The association of AQS important methods with ReentrantLock

As you can see from the architecture diagram, AQS provides a number of Protected methods for custom synchronizer implementations. The related methods of custom synchronizer implementation are also only intended to implement multi-threaded exclusive or shared mode by modifying the State field. Custom synchronizers need to implement the following methods (ReentrantLock needs to implement the following methods, but not all of them) :

The method name describe
protected boolean isHeldExclusively() Whether the thread is monopolizing resources. You only need to implement it if you use Condition.
protected boolean tryAcquire(int arg) Exclusive mode. Arg is the number of times the lock was acquired. Attempts to acquire the resource are returned True on success and False on failure.
protected boolean tryRelease(int arg) Exclusive mode. Arg is the number of times the lock was released, and returns True on success or False on failure.
protected int tryAcquireShared(int arg) Sharing mode. The arG attempts to acquire resources for the number of times the lock is acquired. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
protected boolean tryReleaseShared(int arg) Sharing mode. Arg is the number of times the lock is released, and attempts to release the resource. Returns True if it is allowed to wake up subsequent waiting nodes after release, False otherwise.

In general, custom synchronizers are either exclusive or shared, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock. ReentrantLock is an exclusive lock, so tryacquire-tryRelease is implemented.

Taking unfair lock as an example, this paper mainly expounds the correlation between unfair lock and AQS methods, and the role of each core method will be elaborated in detail in the later part of the article.

In order to help you understand the interaction process between ReentrantLock and AQS, take unfair lock as an example, we will highlight the interaction process of locking and unlocking separately, so as to facilitate the understanding of the following content.

Lock:

  • The ReentrantLock method Lock is used to Lock.
  • The Lock method of Sync is called. Since Sync# Lock is an abstract method, executing the Lock method of the related inner class will essentially execute the Acquire method of AQS based on the selection of fair and unfair locks initialized by ReentrantLock.
  • The AQS Acquire method executes the tryAcquire method, but since tryAcquire requires a custom synchronizer implementation, the tryAcquire method in ReentrantLock executes, Because ReentrantLock is a tryAcquire method implemented through fair-lock and non-fair-lock inner classes, different tryAcquire methods are executed depending on the lock type.
  • TryAcquire is the logic to acquire the lock. If a tryAcquire fails, the subsequent logic of the framework AQS will be executed, independent of the ReentrantLock custom synchronizer.

Unlock:

  • Unlock using ReentrantLock.
  • Unlock calls the Release method of the inner class Sync, which inherits from AQS.
  • TryRelease is called. TryRelease requires a custom synchronizer implementation. TryRelease is only implemented in Sync in ReentrantLock.
  • After successful release, all processing is done by the AQS framework, regardless of the custom synchronizer.

From the above description, it is possible to summarize the mapping of core API layer methods when ReentrantLock is unlocked.

Understand AQS through ReentrantLock

In ReentrantLock, fair and unfair locks are the same at the bottom level. This section uses unfair locks as an example.

In an unfair lock, there is a code like this:

    /** * Sync object for non-fair locks */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
        final void lock(a) {
            if (compareAndSetState(0.1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            returnnonfairTryAcquire(acquires); }}Copy the code

Acquire: Acquire: Acquire: Acquire

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
	if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The tryAcquire() method is called in Acquire

// java.util.concurrent.locks.AbstractQueuedSynchronizer

protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}
Copy the code

It can be seen that this is only a simple implementation of AQS, and the specific lock acquisition method is independently implemented by the respective fair lock and unfair lock (take ReentrantLock as an example). If this method returns True, the current thread acquired the lock successfully and no further execution is required. If the fetch fails, it needs to join the wait queue.

The tryAcquire() method of an unfair lock and the tryAcquire() method of a fair lock are different in that the unjust lock has less judgment than the fair lock. hasQueuedPredecessors()

Hasqueuedtoraise () determines whether a queue is needed, leading to the difference between a fair and an unfair lock as follows:

Fair lock: A fair lock is first come, first come. When a thread acquires a lock, if there are already threads waiting for the lock, the current thread will enter the wait queue.

Unfair lock: If the lock is available, the lock object is owned immediately, regardless of whether there is a wait queue.


/ * * *@author kylin
 */

public class AQSDemo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("------A thread come in");
                // Pause the thread for a few seconds
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{ lock.unlock(); }},"A").start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("------B thread come in");
                // Pause the thread for a few seconds
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{ lock.unlock(); }},"B").start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("------C thread come in");
                // Pause the thread for a few seconds
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally{ lock.unlock(); }},"C").start(); }}Copy the code

Suppose there are three people going to the bank to handle business. The first person A has no one in front of him. Call the lock() method

final void lock(a) {
    if (compareAndSetState(0.1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
Copy the code

The compareAndSetState(0, 1) method is first used to determine

 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
Copy the code

At present, A is the first one to handle business, that is, the first one to obtain the lock. In this case, the state value is still the default value 0, which is successfully updated to 1 through the CAS operation. Returns true. Enter the If judgment

Call setExclusiveOwnerThread (Thread. CurrentThread ());


    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
Copy the code

Set the current thread as the thread with exclusive access, that is, the current thread holds the lock.

At this point the lock method ends and thread A pauses for 2 seconds. Thread B starts to run the lock method.

State is already set to 1

final void lock(a) {
    if (compareAndSetState(0.1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
Copy the code

So the judgment condition is not satisfied. Is run acquire (1)

    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Call first! tryAcquire(1)

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
Copy the code

Non fair lock call nonfairTryAcquire(1)

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
Copy the code
  • The first if conditionc==0In this case, 1 is not satisfied
  • Second if conditioncurrent == getExclusiveOwnerThread()The current exclusive access thread is THREAD A, not thread B. Don’t meet
  • So just return false

When is the first if condition c == 0 satisfied?

   if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}Copy the code

When thread B runs tryAcquire(), thread A just finishes, releases the lock, and sets state to 0. If the conditions are met, the CAS sets state to 1 and the current thread with exclusive access permission to B. Returns true. ! If nonfairTryAcquire(1) is false, the conditional acquire(1) method ends. The lock () success!

When is the second if condition current == getExclusiveOwnerThread() satisfied?

      else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
Copy the code

The condition is satisfied when thread A, the previously exclusive thread, tries to acquire the lock.

At this time

  • nextc = c + acquires
  • c=state=1
  • acquires=1
  • nextc=2
  • setState(nextc);Set state to 2

Set state to 2 and return true. This is why a ReentrantLock is a ReentrantLock. ! If nonfairTryAcquire(1) is false, the conditional acquire(1) method ends. The lock () success!

We continue with the current normal logic and go back to the acquire(1) method

TryAcquire (1) returns false or true so run acquireQueued(addWaiter(node.exclusive), 1)

First check addWaiter(Node.exclusive) joins the wait queue

static final Node EXCLUSIVE = null;

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if(pred ! =null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
Copy the code

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
Copy the code

The main process is as follows:

  • Create a new node with the current thread and lock mode.
  • The Pred pointer points to the Tail node.
  • Set the Node Prev pointer in New to Pred.
  • The tail node is set using the compareAndSetTail method. This method compares tailOffset with Expect. If tailOffset and Expect have the same Node address, set Tail to Update.

The current mode is exclusive mode. The value of mode is null

If the Pred pointer is Null (indicating that there are no elements in the wait queue), or if the current Pred pointer and Tail point to a different position (indicating that it has been modified by another thread), you need to look at Enq’s methods.

Since there is no thread queuing in the queue and tail is null, pred=null does not meet the if condition. Run enq(node) to queue the node

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

Node t = tailtail if null, if the condition is met, call compareAndSetHead(new Node())CAS to set the queue head Node.

private final boolean compareAndSetHead(Node update) {
    // If the expected value is null, that is, only the head node is null
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
Copy the code

If it is not initialized, a header needs to be initialized. Note, however, that the initialized header is not the current thread node, but the node that invoked the no-argument constructor. If initialization or concurrency results in an element in the queue, the method is the same as before. In fact, addWaiter is simply an operation to add a tail node to a double-ended list. Note that the head of the double-ended list is the head of a no-parameter constructor.

Create a new Node. Set Node as the head Node. Return true on success. The head node is assigned to the tail node. The first section is the nod node, which is the sentinel node (virtual head node) created.

As for (;;) T ==null; node.prev = t; Point the precursor of B to the tail node (now the tail node).

CompareAndSetTail (t, node) then sets the last node to node (B) via the CAS operation. Then I will set t.ext =node to point the successor to node B.

Then close the loop, return to addWaiter(), and run the return node, which returns the B node. Go back to acquire(1) and execute acquireQueued(B,1)

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

Final Node p = node.predecessor();

/**
 * Returns previous node, or throws NullPointerException if null.
 * Use when predecessor cannot be null.  The null check could
 * be elided, but is present to help the VM.
 *
 * @return the predecessor of this node
 */
final Node predecessor(a) throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
Copy the code

Return the former node of the current node, that is, the former node of B, that is, the head node.

       if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
Copy the code

If p==head is satisfied, tryAcquire(ARG) will try to acquire the lock again

The setHead (node); Set B as the new head node. And the previous head node, p.ext

Setting the subsequent node pointer to NULL (for GC purposes) returns InterruptedFalse. The end.

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
Copy the code

If p == head && tryAcquire(ARG) is not satisfied, the next judgment is entered

                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
Copy the code

shouldParkAfterFailedAcquire(p, node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

int ws = pred.waitStatus; At this point, pred represents the head node and ws is the initial value 0

 else {
            /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
Copy the code

compareAndSetWaitStatus(pred, ws, Node.SIGNAL); Change the waitStatus of the pred Node (CAS) to node. SIGNAL (-1)

Then return false. Back to the acquireQueued() method, the loop does not end and runs again

Then run again shouldParkAfterFailedAcquire (p, node), at this time

Pred waitStatus== Node.SIGNAL return true

I can run parkAndCheckInterrupt()

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt(a) {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

Call the change method and run locksupport. park(this); Suspend the thread. Return to the Thread. Interrupted (); , returns true if the thread paused successfully. Thread B pauses, waiting to wake up……. (Method doesn’t end, it just pauses, wakes up and continues)

Now thread A finally unlock()

   public void unlock(a) {
        sync.release(1);
    }
Copy the code

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

Enter tryRelease (1)

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
Copy the code

The value of getState()state is 1

int c = getState() – releases; That’s equal to 1 minus 1 is 0

if (Thread.currentThread() ! = getExclusiveOwnerThread())

If the current unlock thread is not a holder of the lock, it throws IllegalMonitorStateException anomalies.

If c==0, then proceed

  • Set up thefree=true
  • setExclusiveOwnerThread(null);The exclusive thread (lock owner) is null
  • setState(c);0
  • Return free, which is true

If c! = 0

The value was incremented in nonfairTryAcquire() embedded lock. This is only released once. Assuming two locks, set the new state value, which is 1, and return false. The end. Unlock () has not been released.

Lock embedded case, where lock2 () is unlock

Unlock once, then state becomes 1, and you need the UNLOCK of lockMethod2() to actually release the lock.

Back to the main flow, thread A unlock releases the lock, tryRelease returns true on success, and state becomes 0.

        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
Copy the code

H at this time! = null && h.waitStatus ! = 0

  • The head node H is not null and is the sentinel node.
  • WaitStatus is used when node B is waitingcompareAndSetWaitStatus()Turned out to be- 1

Go into judgment to call the unparkprecursor (h);

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

int ws = node.waitStatus; Call when waitStatus of the head node is -1 and less than 0

compareAndSetWaitStatus(node, ws, 0); It’s going to be 0 again

Then the Node s = Node. The next next head Node B, if (s = = null | | s. aitStatus > 0)

  • B is not null
  • The waitStatus of node B is 0 and cannot be greater than 0

if (s ! = null), call locksupport.unpark (s.read); Now, A’s unlock() is complete. Thread B is woken up.

If thread B is woken up for interruption = true; Final Node p = node.predecessor(); The forerunner node P of B is head, and tryAcquire(ARg) is executed. At this time, as no thread A holds the lock, thread B obtains the lock and meets the conditions, call the setHead(node) method to assign the property of node B to the head node (B is also new node () at this time).

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
Copy the code

WatiStatus =0,thread=0, thread=0, thread=0, thread=0, thread=0, thread=0, thread=0, thread=0, thread=0

And then p.ext = null; The original head node is no longer pointed to by any pointer and will be removed by GC.

Failed =false Returns interrupted or true

Then the if (! TryAcquire (ARG) && acquireQueued(addWaiter(Node.exclusive), ARG)

Call selfInterrupt ();

static void selfInterrupt(a) {
    Thread.currentThread().interrupt();
}
Copy the code

In fact, when thread B wakes up, it should continue from here.

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false.0L);
        setBlocker(t, null);
    }
Copy the code

setBlocker(t, null); Set Blocket to NULL

So Interruptible B = blocker; If b is null, the thread b does not interrupt, but interrupt0(); Set interrupt Flag to false. That’s it.

But why interrupt the thread after acquiring the lock? This part belongs to the Java collaborative interrupt knowledge content, interested students can refer to. Here is a brief introduction:

  1. When an interrupt thread is woken up, it is not known why it was woken up, either because the current thread was interrupted while waiting or because it was woken up after a lock was released. So we check the interrupt flag with the thread.interrupted () method (which returns the interrupted status of the current Thread and sets the interrupt flag to False), record that the Thread has been interrupted, and interrupt again if it is found to have been interrupted.
  2. A thread is awakened while waiting for a resource, and then continues to try to acquire the lock until it does. That is, interrupts are not responded to throughout the process, only interrupt records are logged. Finally, the lock is grabbed and returned, so if it has been interrupted, it needs to be interrupted again.

For example, if thread.interrupted () is used for additional checks, see the source code for ThreadPoolExecutor.

Application scenarios in JUC

In addition to the ReentrantLock application above, AQS as a framework for concurrent programming provides a good solution for many other synchronization tools. The following lists several synchronization tools in JUC to give an overview of the application scenarios of AQS:

Synchronization tool Association between synchronization tools and AQS
ReentrantLock Use AQS to save the number of times the lock is held repeatedly. When a thread acquires a lock, ReentrantLock records the identity of the thread that currently acquires the lock, which is used to detect duplicate acquires and to handle exceptions when the wrong thread attempts to unlock the lock.
Semaphore Use the AQS synchronization state to hold the current count of the semaphore. TryRelease increases the count and acquireShared decreases the count.
CountDownLatch Use the AQS synchronization status to represent the count. When the count is 0, all Acquire operations (CountDownLatch’s await method) can pass.
ReentrantReadWriteLock The 16 bits in the AQS synchronization state are used to hold the number of times the write lock was held, and the remaining 16 bits are used to hold the number of times the read lock was held.
ThreadPoolExecutor The Worker uses the AQS synchronization state to set up exclusive thread variables (tryAcquire and tryRelease).

Custom synchronization tools

After understanding the basic principles of AQS, according to the above AQS knowledge points, their own implementation of a synchronization tool.