AQS

The properties of AbstractQueuedSynchronizer:

// The head of the wait queue, representing the node currently holding the lock
private transient volatile Node head;
// Wait for the last node of the queue
private transient volatile Node tail;
// If the synchronization status is greater than 0, there is a lock. Every time you lock, you give state+1. State =0 indicates no lock state
// Change the state value through CAS to ensure its atomicity
private volatile int state;
// Determine whether the lock is reentrant or not
private transient Thread exclusiveOwnerThread;
Copy the code

The AQS wait queue is shown below. Note that head is not in the wait queue.

Each thread in the queue is wrapped as a Node object and stored as a linked list:

static final class Node {
	// Indicates that the node is in shared mode
	static final Node SHARED = new Node();
	// Indicates that the node is in exclusive mode
	static final Node EXCLUSIVE = null;

	// Node status, which can have the following four values
	volatile int waitStatus;
	// The thread has cancelled the lock contention
	static final int CANCELLED =  1;
	If the node status of the current thread is SIGNAL, the subsequent nodes of the current thread need to be woken up
	static final int SIGNAL    = -1;
	// Wait condition state, which indicates that the current node is waiting on condition, i.e. in the condition queue
	static final int CONDITION = -2;
	ReleaseShared needs to be propagated to subsequent nodes, which is only used in shared lock mode
	static final int PROPAGATE = -3;
    
    // A reference to the precursor node
    volatile Node prev;
    // A reference to a successor node
    volatile Node next;
    // This is the thread itself
    volatile Thread thread;
}
Copy the code

AQS structure is roughly composed of three parts:

  1. The state of an integer type that is volatile, used to indicate synchronization status, and provides getState and setState to operate on synchronization status
  2. A FIFO wait queue is provided to realize contention and wait between threads, which is the core of AQS
  3. AQS internally provides various cas-based atomic operation methods, such as compareAndSetState methods, and provides acquire and release methods for lock operations

Use ReentrantLock as an example to introduce the code:

// I'll use the concept of a Service from Web development
public class OrderService {
    // Use static, so that each thread gets the same lock. Of course, spring MVC is a singleton by default
    private static ReentrantLock reentrantLock = new ReentrantLock(true);
    
    public void createOrder(a) {
        For example, we only allow one thread to create orders at a time
        reentrantLock.lock();
        // Lock is usually followed by a try statement
        try {
            Only one thread can come in at a time (the thread that acquired the lock).
            // Other threads block on the lock() method, waiting for the lock to come in
            // Execute code...
            // Execute code...
            // Execute code...
        } finally {
            / / releases the lockreentrantLock.unlock(); }}}Copy the code

When creating ReentrantLock, we specify the entry parameter true. ReentrantLock internally manages the lock through Sync. There are two types of fair lock and unfair lock.

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code

Thread preemption lock

ReentrantLock:: Lock is actually the Sync::lock call

static final class FairSync extends Sync {
    final void lock(a) {
        // Preempt the lock, passing in the value 1
        // The method is implemented in the AQS class and cannot be modified
        acquire(1); }}Copy the code
	public final void acquire(int arg) {
        // If there are two steps in it
        // 1. TryAcquire attempts to acquire the lock, if successful, return directly
        // 2. AddWaiter wraps the current thread as a Node object and attempts to acquire the lock again using the acquireQueued method
        // If both operations fail, the current thread is interrupted
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

Talk about these methods in detail

	// This method is implemented in FairSync
	// The return value is Boolean, true means lock has been obtained
	// Returns true: 1. The current lock-free state. 2
	protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
        	// If state==0, the current state is not locked
            if (c == 0) {
                // Even if the lock is not locked, because it is a fair lock, we need to check whether there are already waiting threads in the queue
                if(! hasQueuedPredecessors() &&// Use CAS to set the value from 0 to 1
                    compareAndSetState(0, acquires)) {
                    // Set the current thread to the acquired thread
                    setExclusiveOwnerThread(current);
                    return true; }}// state! =0 to determine whether it is reentrant
            else if (current == getExclusiveOwnerThread()) {
                // It is a reentrant lock
                // Set state to oldState+1
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
        	// Failed to obtain the lock
            return false;
        }
Copy the code

After tryAcquire attempts fail, the acquireQueued method is entered, but the addWaiter call wraps the current thread into a Node object and merges it into the wait queue.

	// The mode parameter is node. EXCLUSIVE
	private Node addWaiter(Node mode) {
        // The current thread is wrapped as Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // Class initialization, tail is null
        // Check whether tail is empty
        if(pred ! =null) {
            // Set the current tail node as your own head node
            node.prev = pred;
            // CAS setting, if failure proves that threads are contending
            if (compareAndSetTail(pred, node)) {
                // Form a bidirectional list
                pred.next = node;
                returnnode; }}There are two scenarios: 1. The queue is empty (tail==null) 2. The CAS fails
        enq(node);
        return node;
    }
Copy the code
	// Insert node into queue, initialize queue if necessary (queue may be null)
	private Node enq(final Node node) {
        // Infinite loop
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // The queue is empty and must be initialized
                // Set head to empty node
                if (compareAndSetHead(new Node()))
                    // If set successfully, the tail points to the same Node object
                    tail = head;
            } else {
                // Queue is not empty, set tail node to the last node of tail head
                node.prev = t;
                // Set tail to the current node using CAS
                // This step may fail, and the failure continues until the insert succeeds
                if (compareAndSetTail(t, node)) {
                    // Set the bidirectional linked list successfully
                    t.next = node;
                    returnt; }}}}Copy the code

After the addWaiter method, the Node object of the current thread is queued and the acquireQueued method is executed.

Let’s go over the acquire method:

if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();Copy the code

The acquireQueued method returns true and blocks the current thread.

	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // Infinite loop
            for (;;) {
                // p is the previous node of the current node
                final Node p = node.predecessor();
                // If p is the head node (note that the head node is the node that acquired the lock), the first digit in the wait queue is proved
                // Try to get the lock, why try, because head is empty when the blocking queue is initialized for the first time, not the executing thread
                if (p == head && tryAcquire(arg)) {
                    // Obtain the lock successfully, set itself as the head node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // Return the interrupt flag: false
                    return interrupted;
                }
                // Failed to obtain the lock
                // Check and update node status. This method returns true to prove that thread blocking is required
                // If this method returns false, the loop continues
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // This method is executed only if the preceding method returns true, which means that the thread is blocked
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}Copy the code
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
    	// The status of the previous node is notifiable, normal, and true is returned
        // node. SIGNAL == -1 means that the next Node of the current Node needs to be woken up
        if (ws == Node.SIGNAL)
            /* * This node has already set status asking a release * to signal it, so it can safely park. */
            return true;
        if (ws > 0) {
            /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
            // If the status of the previous node is cancelled, skip the previous node and set its node as the next node of the previous node (similar to deleting the previous node).
            // Because its own node is awakened up to the previous node, keep the previous node valid
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
            // The state of the previous node is 0 or -2 or -3
            // The status of each new node is 0, so this step sets the status of the precursor node to -1, that is, it needs to be woken up
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        The acquireQueued method is relooped by returning false
        return false;
    }
Copy the code
	private final boolean parkAndCheckInterrupt(a) {
        // Suspend the thread, which is stuck at this step, waiting to wake up
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

The code is finished, and the possible scenarios are analyzed

Initial – no lock, thread atryAcquire successful, return directly.

When thread A executes, thread B attempts a lock

  • tryAcquireFailure. The thread is queued
  • Enter theacquireQueuedMethod, the queue has just been initialized, p==head but will fail to acquire the lock
  • Thread B interrupt

Release the lock

If the thread does not preempt the lock, it will interrupt through locksupport. park(this) and wait to be woken up

	// Call unlock
	public void unlock(a) {
    	sync.release(1);
	}

	/ / AQS method
	public final boolean release(int arg) {
        // state-1, which returns true to indicate that the lock is released and another thread needs to be woken up
        if (tryRelease(arg)) {
            Node h = head;
            // If head==null or the wait state of h is 0, there are no threads in the queue to wake up
            if(h ! =null&& h.waitStatus ! =0)
                // Wake up the successor node
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

	// Subtract state, return true to indicate that the current lock has been released (if it is reentrant, it will not be released, return false)
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
        	// If state after -1 ==0, the current thread occupation is finished
            if (c == 0) {
                // Release the thread and set the thread to null
                free = true;
                setExclusiveOwnerThread(null);
            }
        	// Update the value, no concurrency here
            setState(c);
            return free;
        }

	// Wake up subsequent threads
	private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
        // Change the wait state of head to 0
        // If not, the head node is always awakened
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
        // Wake up subsequent nodes, but it is possible that the node has unwaited (waitStatus==1)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // find the first node whose waitStatus<=0 from the end of the queue
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        / / wake
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

Upon awakening, the blocked thread enters the method

	private final boolean parkAndCheckInterrupt(a) {
        // Just suspend the position
        LockSupport.park(this);
        // Wake up and return false to the acquireQueued method loop
        return Thread.interrupted();
    }
Copy the code

conclusion

So to sum up.

In a concurrent environment, locking and unlocking require the coordination of the following three components:

  1. The lock state. We need to know if the lock is being held by another thread, and that’s what state is for. When it’s 0, it means that no thread is holding the lock, so we can try to grab the lock. You add +1 to state, you subtract 1 to unlock, until state goes to 0 again, so lock() and unlock() have to pair. It then wakes up the first thread in the wait queue to claim the lock.
  2. Blocking and unblocking of threads. AQS uses locksupport. park(Thread) to suspend threads and unpark to wake them up.
  3. Block the queue. A queue is needed to manage these threads. AQS uses a FIFO queue, which is a linked list. Each node holds a reference to its successor node. AQS uses a variant of CLH lock to implement, interested readers can refer to this article on the INTRODUCTION of CLH, written simply.

Sample graph parsing

This is a review session, a simple example, and a chance to help you understand some of the things above.

The first thread calls reentrantLock.lock(). If you scroll to the front, tryAcquire(1) will return true. With state=1, there is no initialization of the head, let alone any blocking queue. If thread 1 calls unlock() and thread 2 comes in, then the world is completely peaceful, there is no intersection, so WHY do I need AQS?

Imagine what happens if thread 2 calls lock() before thread 1 calls unlock().

Thread 2 will initialize head [new Node()], and thread 2 will insert the blocking queue and suspend.

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

First, thread 2 initializes the head node with head==tail and waitStatus==0

Then thread 2 joins the queue:

We also need to look at the waitStatus of the node at this time. We know that the head node is initialized by thread 2, the waitStatus is not set at this time, Java default is set to 0, But by this method, shouldParkAfterFailedAcquire thread 2 take precursor nodes, namely the head waitStatus is set to 1.

What is the waitStatus of thread 2? It’s 0 because it’s not set;

If the thread 3 to come in at this time, directly into the thread 2 back, at this time the thread 3 waitStatus is 0, the shouldParkAfterFailedAcquire method when the thread 2 waitStatus precursor node is set to 1.

The SIGNAL(-1) state in waitStatus means that the successor node needs to be woken up. In other words, this waitStatus actually represents the status of its successor node rather than its own state. As we know, when each node joins the queue, it changes the status of its predecessor node to SIGNAL, and then blocks, waiting to be woken up by its predecessor. There are two problems involved here: a thread unqueued and woke up.

The article quoted

Java concurrency ASQ source code analysis

Clear AbstractQueuedSynchronizer line source analysis