1. The meaning of existence

AQS (AbstractQueuedSynchronizer) is the basis of numerous lock in JAVA and concurrent tool, its underlying use optimistic locking, heavy use of CAS operation, and the conflict, retry using spin way, in order to realize the lightweight and efficiently to obtain the lock.

Provides a framework for implementing blocking locks and associated synchronizers (Semaphore, etc.) that rely on first-in, first-out (FIFO) wait queues. This class is intended to provide a useful basis for most synchronizers that rely on a single atomic int value to represent state. Subclasses must define protected methods that change this state and define what that state means for getting or releasing this object. Given this, other methods in this class will perform all queuing and blocking mechanisms. Subclasses can maintain other status fields, but only keep track of atomically updated int values using the methods getState,setState, and compareAndSetState operations, as opposed to synchronization.

This class supports either or both default exclusive mode and share mode. When fetching is exclusive, attempts by other threads to fetch will not succeed. A shared pattern captured by multiple threads may (but does not necessarily) succeed. The class does not “understand” these differences, and when the shared mode is acquired successfully, the next waiting thread (if one exists) must also determine whether it is also available. Threads waiting in different modes share the same FIFO queue. Typically, implementation subclasses support only one of these patterns, but can come into play in ReadWriteLock for example. Subclasses that support exclusive or shared schemas only need not define methods that support unused schemas.


2. Core knowledge points

2.1 the state

private volatile int state; // Synchronization status
Copy the code

State is the heart of the tool, and often the tool is all about setting and modifying state, and many methods operate depending on what state is currently in place. Because states are shared globally, they are typically set to volatile to make changes visible.

2.2 the CLH queue

Queues in AQS are a virtual bidirectional queue (FIFO) variant of CLH, which allocates locks by encapsulating each thread requesting a shared resource into a node. Queues adopt the idea of pessimistic locking, indicating that the resource, state or condition currently waiting may not be satisfied in a short time. Therefore, it wraps the current thread into some type of data structure and throws it into a wait queue. When certain conditions are met, it takes it out of the wait queue.

2.3 the CAS operation

The CAS operation is the lightest concurrent processing. Generally, the CAS operation is used to modify the status, because the status may be modified by multiple threads at the same time. The CAS operation ensures that only one thread can successfully modify the status at the same time, thus ensuring thread safety. CAS uses the idea of optimistic locking, so it is often accompanied by spin, and if it is found that the CAS cannot be successfully executed at the moment, it is retried until it succeeds.


3. Core implementation principle

3.1 as the basis of synchronizer

To use this class as the basis for a synchronizer, check or modify the synchronization state using getState, setState, or compareAndSetState to redefine the following methods (if applicable) :

  1. tryAcquire

In exclusive mode, arG is the number of times the lock is acquired. If the lock is acquired successfully, return True; if the lock is not acquired, return False.

  1. tryRelease

In exclusive mode, arg is the number of times the lock is released, and returns True on success or False on failure.

  1. tryAcquireShared

In shared mode, arG indicates the number of times to obtain locks, and attempts to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.

  1. tryReleaseShared

In shared mode, arg is the number of times the lock is released, and the attempt to release the resource is returned True if it is allowed to wake up subsequent waiting nodes after release, False otherwise.

  1. isHeldExclusively

Whether the thread is monopolizing resources. You only need to implement it if you use Condition.

By default, each of these methods can trigger UnsupportedOperationException. The implementation of these methods must be internally thread-safe and should generally be short and non-blocking. Defining these methods is the only supported way to use this class. All other methods are declared final because they cannot change independently.

3.2 Synchronization Status State

AQS maintains a field named state, which means synchronization state, and is modified by volatile to show the current lock status of critical resources.

private volatile int state;
Copy the code

There are several ways to access the state field: Return the current value of the synchronization state. This operation has the memory semantics of a volatile read

protected final int getState(a) {
    return state;
}
Copy the code

Sets the value of synchronization status. This operation has the memory semantics of a volatile write.

protected final void setState(int newState) {
    state = newState;
}
Copy the code

If the current state value is equal to the expected value, the synchronization state is set atomically to the given update value. This operation has the memory semantics of volatile reads and writes

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

These methods are Final, which means they cannot be overridden in subclasses. We can realize the exclusive mode and shared mode of multithreading by modifying the synchronization State represented by the State field. The value of State represents the State of the lock. State 0 means that the lock is not occupied, and State greater than 0 means that there is already a thread holding the lock. The reason I’m saying greater than 0 instead of equal to 1 is because there could be reentrant situations. You can think of the state variable as the number of threads currently holding the lock.

public abstract class AbstractOwnableSynchronizer
    protected AbstractOwnableSynchronizer(a) {
      
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread(a) {
        returnexclusiveOwnerThread; }}Copy the code

The value of the exclusiveOwnerThread attribute acquires the lock process for the thread exclusive mode that currently holds the lock:

Lock acquisition process in shared mode:

3.3 Data Structure

  1. The most basic data structure in AQS is Node, which is the Node in the CLH variant queue.
static final class Node {
    // indicates that threads are waiting for locks in shared mode
    static final Node SHARED = new Node();
    // indicates that the thread is waiting exclusively for the lock
    static final Node EXCLUSIVE = null;
    // A value of 1 indicates that the thread's request for the lock has been canceled
    static final int CANCELLED =  1;
    // -1 indicates that the thread is ready, waiting for the resource to be released
    static final int SIGNAL    = -1;
    // is -2, indicating that the node is in the wait queue and the node thread is waiting to wake up
    static final int CONDITION = -2;
    // If the value is -3, this field is used only when the current thread is SHARED
    static final int PROPAGATE = -3;
    // The current node status in the queue
    volatile int waitStatus;
    // The precursor node
    volatile Node prev;
    // Subsequent nodes
    volatile Node next;
    // Thread of the current node
    volatile Thread thread;
    // points to the next node in CONDITIONNode nextWaiter; . }Copy the code
  1. A virtual bidirectional queue (FIFO) variant of CLH in AQS, which allocates locks by encapsulating each thread requesting a shared resource into a node.
// Queue head node
private transient volatile Node head;
// End of queue
private transient volatile Node tail;
Copy the code

A queue in AQS is a FIFO queue. Its head node is always a dummy node, which does not represent any thread. Therefore, the thread attribute of the node to which head points is always null. But we won’t create them during the build process, because without contention, it would be a waste of time. Instead, the node is constructed and the head and tail Pointers are set on the first contention. Only all nodes from the secondary node represent all threads waiting for locks. That is, if the current thread does not grab the lock and is wrapped as a Node and thrown into the queue, even if the queue is empty, it will queue second and we will create a new virtual Node in front of it.

4. Obtain the lock implementation

4.1 ReentrantLock Exclusive internal lock structure

Constructor source code

// Create unfair locks by default
public ReentrantLock(a) {
    sync = new NonfairSync();
}
// Create a fair lock by passing the value true
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code

There are three inner classes in ReentrantLock:

  1. One is the abstract Sync AbstractQueuedSynchronizer is realized
  2. NonfairSync inherits Sync
  3. FairSync inherits Sync

4.2 Implementation of unfair lock

ReentrantLock A method to obtain a lock

public void lock(a) {
    sync.lock();
}
Copy the code

Unfair lock implementation of ReentrantLock

static final class NonfairSync extends Sync {
    final void lock(a) {
    // Succeeded in setting state from 0 to 1
        if (compareAndSetState(0.1))
        // Sets the current thread to an exclusive thread
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        returnnonfairTryAcquire(acquires); }}Copy the code

CompareAndSetState (0, 1)

protected final boolean compareAndSetState(int expect, int update) {
    // Use the unsafe.compareAndSwapInt method to set the value
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

StateOffset is the offset of the state attribute maintained by AQS

setExclusiveOwnerThread(Thread.currentThread());

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
Copy the code

acquire(1); Acquire (int arg) in AQS is called

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

TryAcquire (ARG) This method is protected and implemented by subclasses

We need to look at the nonfairTryAcquire method implemented in NonfairSync, which calls the nonfairTryAcquire method

static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        returnnonfairTryAcquire(acquires); }}Copy the code

NonfairTryAcquire (int acquires) method implementation

final boolean nonfairTryAcquire(int acquires) {
    // Get the current thread
    final Thread current = Thread.currentThread();
    // Get the current state value
    int c = getState(); 
    if (c == 0) {
      // See if the setting succeeds
            if (compareAndSetState(0, acquires)) {
           // Sets the current thread to an exclusive thread
            setExclusiveOwnerThread(current);
            return true; }}// Returns the last thread set by setExclusiveOwnerThread; If never set, null is returned
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // Set the value of state
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code

AcquireQueued (addWaiter(Node.EXCLUSIVE), arg); Node.EXCLUSIVE is null, so mode is null, and nextWaiter in a Node is null.

static final class Node {
  Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread; }}private Node addWaiter(Node mode) {
    // Create a new node
    Node node = new Node(Thread.currentThread(), mode);
    // Assign the tail node of the current CLH queue to preD
    Node pred = tail;
    if(pred ! =null) { // If the tail node is not empty
        node.prev = pred; // Point the precursor node of the current node to the tail node of the CLH queue
        if (compareAndSetTail(pred, node)) { // CAS value
            pred.next = node; // The successor node of the rear node of the CLH queue points to the new node
            return node;
        }
    }
    enq(node);
    return node;
}
Copy the code

If the tail node of the CLH queue is null, the enq(node) method is executed

// Set the head node of the queue in CAS mode
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// Set the tail node of the queue in CAS mode
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// The node joins the queue
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // If the tail node is null
        if (t == null) {
        // initializes a node and sets it to the head node
            if (compareAndSetHead(new Node()))
                tail = head; // both ends point to the same empty node
        } else {
      // If the tail node is not empty, then the precursor node of the queued node passed in points to the tail node of the current queue
            node.prev = t;
      // Set the currently passed enqueue node to the end of the current queue
            if (compareAndSetTail(t, node)) { 
          // Point the successor node of the tail node of the current queue to the new node passed in
                 t.next = node;
                returnt; }}}}Copy the code

View the acquireQueued method implementation

// Get the precursor node of the current node
final Node predecessor(a) throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
// Check and update the status of nodes that cannot be retrieved. Returns true if the thread should block
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//SIGNAL is not the state of the current node, but the state of the next node of the current node. When waitStatus of a node is set to SIGNAL, its next node (its successor // node) is already suspended (or about to be suspended), so if its waitStatus attribute is SIGNAL when the current node releases the lock or gives up acquiring the lock, It does one more thing -- wake up its successor node.
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
     // Ws > 0 for current Node, Node.CANCELLED: The wait lock has been CANCELLED (due to timeout or interruption)
        // Since the precursor node is not waiting, keep looking until you find a node that is still waiting for the lock
        // Then we skip over the nodes that are not waiting for locks and queue directly behind the nodes that are waiting for locks
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
     // The state of the precursor node is neither SIGNAL nor CANCELLED
        // Use CAS to set the front-end Node ws to Node.SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// A convenient way to stop, and then check for interruption
private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}
// Cancel the fetch attempt in progress
private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
            Node next = node.next;
            if(next ! =null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else{ unparkSuccessor(node); } node.next = node; }}// The addWaiter method has successfully added the node wrapped with the current Thread to the end of the queue
// This method will try again to acquire the lock
// After another attempt to acquire the lock fails, determine whether the current thread needs to be suspended
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
         // Get the precursor node of the current node
            final Node p = node.predecessor();
         // Continue trying to acquire the lock while the precursor node is the head node
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // Suspend the current thread so that the CPU no longer schedules it
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Why try to acquire the lock again when you failed earlier? The HEAD node is a virtual node. It does not represent any thread, or the thread that holds the lock. If the HEAD node is the HEAD node, This indicates that the current node is already at the top of the queue.

setHead(node); methods

// This method points head to the passed node and sets the thread and prev properties of the node to null
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
Copy the code

As you can see, the essence of this method is to discard the original head and point the head at the node that has acquired the lock. But it then sets the node’s thread attribute to null, in a sense causing the new head node to become a virtual node that does not represent any threads. The reason for this is that after the tryAcquire call, the exclusiveOwnerThread attribute already records the current thread that acquired the lock. There is no need to do this. This sort of removes the current thread from the queue, and is a disguised exit operation. ShouldParkAfterFailedAcquire (Node Mr Pred, Node Node) method

  1. If the waitStatus value for the precursor Node is Node.SIGNAL, true is returned
  2. If waitStatus for the precursor is Node.cancelled (WS > 0), skip those nodes, rediscover the precursor in normal waiting, and wait behind it, returning false
  3. Otherwise, change the status of the precursor Node to Node.SIGNAL and return false

Finally code in the acquireQueued method

private void cancelAcquire(Node node) {
  // Filter invalid nodes
	if (node == null)
		return;
  // Set this node to be unassociated with any thread, i.e. virtual node
	node.thread = null;
	Node pred = node.prev;
  // Skip the canceled node through the precursor node
	while (pred.waitStatus > 0)
		node.prev = pred = pred.prev;
  // Get the successor node of the filtered precursor node
	Node predNext = pred.next;
  // Set the current node state to CANCELLED
	node.waitStatus = Node.CANCELLED;
  // If the current node is the tail node, the first non-cancelled node from back to front is set to the tail node
  // If the update fails, enter else, and if the update succeeds, set tail's successor node to NULL
	if (node == tail && compareAndSetTail(node, pred)) {
		compareAndSetNext(pred, predNext, null);
	} else {
		int ws;
    // If the current node is not a successor of head,
    // 1. Check whether the driver of the current node is SIGNAL
    // 2. If not, set the precursor node to SINGAL to see if it succeeds
    // If either 1 or 2 is true, check whether the thread on the current node is null
    // If all the above conditions are met, the successor pointer of the precursor node of the current node points to the successor node of the current node
		if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
			Node next = node.next;
			if(next ! =null && next.waitStatus <= 0)
				compareAndSetNext(pred, predNext, next);
		} else {
   // If the current node is a successor of the head, or the above conditions are not met, then wake up the successor of the current node
			unparkSuccessor(node);
		}
		node.next = node; // help GC}}Copy the code

4.3 Flowchart of obtaining unfair locks

Flowchart of the successful acquisition of an unfair lock

Flowchart of an unfair lock acquisition failure

5. Release lock implementation

5.1 Analysis of Lock Release Codes

Try to release the lock. If the current thread is the holder of the lock, the retention count is reduced. If the hold count is now zero, the lock is released. If the current thread is not the holder of the lock, it throws IllegalMonitorStateException.

# #ReentrantLock
public void unlock(a) {
    sync.release(1);
}
Copy the code

Is AbstractQueuedSynchronizer sync. Release (1) call the release method

# #AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

Analyze the tryRelease(ARG) method

TryRelease (ARG) This method is called in ReentrantLock

protected final boolean tryRelease(int releases) {
// Subtract the number of threads held by the current lock from the value to be released
    int c = getState() - releases; 
    Throw an exception if the current thread is not the thread that owns the lock
    if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
    boolean free = false;
    // If c = 0, state = 0, the current lock is not held by any thread
    // Set the current thread of ownership to null
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // Set state to 0
    setState(c);
    return free;
}
Copy the code

If the head node is not empty and waitStatus! = 0, wake up subsequent nodes if they exist. Why is the judgment condition here h! = null && h.waitStatus ! = 0?

Because if h == null, Head is not initialized. In the initial case, head == null, the first node to join the queue, head will initialize a virtual node. So head == null is the case if you haven’t entered the team yet.

  1. h ! = null && waitStatus == 0 indicates that the thread corresponding to the successor node is still running and does not need to be woken up
  2. h ! = null && waitStatus < 0 indicates that the successor node may be blocked and needs to be woken up
private void unparkSuccessor(Node node) {
// Get the header waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
// Get the next node of the current node
    Node s = node.next;
// If the next node is null or cancelled, the non-cancelled node at the beginning of the queue is found
    if (s == null || s.waitStatus > 0) {
        s = null;
        // Find the first node in the queue whose waitStatus<0.
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
  // If the next node of the current node is not empty and the state is <=0, the current node is awakened
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

Why look back for the first non-cancelled node? Look at the addWaiter method

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Copy the code

Node. prev = pred, compareAndSetTail(pred, node), pred. Next = node; Not yet. If the unparksucceeded method had been carried out at this time there would have been no way of looking forward, so one had to go from the back to the front. Another reason is that when CANCELLED nodes are generated, the Next pointer is disconnected first, while the Prev pointer is not. Therefore, the whole Node must be traversed from back to front. Therefore, if the search is conducted from front to back, all nodes may not be traversed due to the non-atomic joining operation and the operation of the Next pointer being interrupted by the CANCELLED node generation process in extreme cases. So, after the corresponding thread is woken up, the corresponding thread will continue to execute.

5.2 Flowchart for Releasing locks

6. Pay attention to

Due to the long length of the implementation of fair lock in the next blog, thank you for your attention and support! Have a problem hope everybody points out, common progress!!