Introduction to the

AQS is based on CAS, which realizes the synchronization, blocking and wake up mechanism of FIFO waiting queue.

Inheritance system

  1. AbstractQueuedSynchronizer inheritance in AbstractOwnableSynchronizer, thread is used to set and get access
  2. AbstractQueuedSynchronizer waiting queue model of CLH (Craig, Landin and Hagersten) variants of the queue, for double linked list
  3. Its inner class Node is the Node of CLH
  4. ConditionObject is the implementation class of Condition

What is a CLH queue?

CLH: Craig, Landin and Hagersten queues are unidirectional linked lists. Queues in AQS are virtual bidirectional queues (FIFO) of CLH variants. AQS is to achieve lock allocation by encapsulating each thread requesting shared resources into a node.

AQS properties

attribute instructions
head Wait queue head node
tail Wait for the end node of the queue
state sync

AbstractQueuedSynchronizer# Node attribute

attribute instructions
waitStatus Status of the current node in the queue
prev Precursor pointer
next Pointer to the subsequent
thread Represents the thread at the node
nextWaiter Points to the next node in CONDITION

AQS lock mode

model instructions
SHARED In shared mode, the lock can be held by multiple threads. For example, in read/write lock mode, multiple threads are allowed to hold a lock
EXCLUSIVE In exclusive mode, the lock can only be held by one thread

Meaning of the status value of waitStatus

Corresponding field definition value instructions
CANCELLED 1 The thread’s request to acquire the lock was cancelled due to timeout or interrupt
0 Thread Node initialization value
SIGNAL – 1 Threads on subsequent nodes are suspended to enter the wait state
CONDITION 2 – The node is in a wait queue, waiting to wake up
PROPAGATE – 3 The thread is used only when it is in shared mode

AQS#CLH join the team

public class AbstractQueuedSynchronizer {
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); // Create a new node for the current thread
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // Assign the tail node to the new precursor node
        if(pred ! =null) { // The precursor node is not null
            node.prev = pred; // Point the precursor node of the current thread node to the old tail node
            if (compareAndSetTail(pred, node)) { // Set the current thread node to the tail node
                pred.next = node; // Point the successor of the old tail node to the current thread node
                return node; // Returns the current node
            }
        }
        enq(node); // The old tail node is null, indicating that the wait queue is null
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) // Set the header node
                    tail = head; // Set the tail node
            } else {
                node.prev = t; // Point the precursor node of the current thread node to the old tail node
                if (compareAndSetTail(t, node)) { // Set the tail node to the current thread node
                    t.next = node; // Point the successor of the old tail node to the current thread
                    return t; / / return
                }
            }
        }
    }
}
Copy the code

As can be seen from the above code, the pre and thread attributes of the HEAD node of AQS are null, and its successor nodes point to the node of the first suspended thread of the wait queue.

AQS team entry diagram

Acquiring a lock

An exclusive lock

Acquire lock -acquire(int arg)

  1. Gets in exclusive mode, ignores interrupts (even if the thread is interrupted, the business logic of the reserve is still executed), and returns successfully, otherwise the thread is suspended and queued.
  2. Source code analysis
public class AbstractQueuedSynchronizer {
    public final void acquire(int arg) {
        if(! tryAcquire(arg) &&// No lock was obtained
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Add the current thread to the queue. // Add the current thread to the queue
            selfInterrupt(); // Interrupt the thread}}Copy the code

AbstractQueuedSynchronizer# first acquire through tryAcquire (int arg) method to get lock (the logic of this method is determined by specific implementation class), success is returned directly, if failure criterion Two-step, The thread is added to the wait queue via the addWaiter(Node mode) method and acquireQueued(Final Node Node, int arg) method to acquire the lock with uninterrupted spin

public class AbstractQueuedSynchronizer {
    final boolean acquireQueued(final Node node, int arg) { // Current thread Node, agr=1
        boolean failed = true; // Get lock failure flag
        try {
            boolean interrupted = false; // Interrupt flag
            for (;;) { / / polling
                final Node p = node.predecessor(); // Get the last node
                if (p == head && tryAcquire(arg)) { // If it is a header node, try to obtain the lock
                    setHead(node); // Set the header node
                    p.next = null; // help GC
                    failed = false; // Lock failure flag is set to false
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // Non-header node
                        parkAndCheckInterrupt()) // Suspends the current thread and interrupts validation
                    interrupted = true; // Set the interrupt flag to true}}finally {
            if (failed) // Failed to obtain the lock
                cancelAcquire(node); // Cancel the lock acquisition}}}Copy the code

AbstractQueuedSynchronizer# acquireQueued as uninterrupted access patterns has thread in the queue, when the current thread when the precursor node as the head node, acquiring a lock and returns, when not the head node, Through shouldParkAfterFailedAcquire method to determine whether a thread to hang, if necessary, by parkAndCheckInterrupt detection method hung thread interrupt, otherwise enter the next cycle, until the thread hangs, Finally, wait for the thread of the precursor node to release the lock, enter the last loop and return with the lock. 3. Lock obtaining process

Acquires the lock -acquireInterruptibly(int arg)

  1. Gets in exclusive mode and terminates if the thread is interrupted
  2. Source code analysis
public class AbstractQueuedSynchronizer {
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // Interrupt the verification
            throw new InterruptedException();
        if(! tryAcquire(arg))/ / acquiring a lock
            doAcquireInterruptibly(arg); // Fetch in exclusive interruptible mode}}Copy the code

AbstractQueuedSynchronizer# acquireInterruptibly first do is break check, interrupt directly throw an exception, erasing interrupt tags at the same time, do not enter the lock acquisition process; Use the tryAcquire method to attempt to acquire the lock, or doAcquireInterruptibly to obtain the lock continuously if the tryAcquire method fails

public class AbstractQueuedSynchronizer {

    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE); // Add to wait queue
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor(); // Get the previous gpa
                if (p == head && tryAcquire(arg)) { // Get the lock again
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // Verify whether to suspend
                        parkAndCheckInterrupt()) // Thread suspends and validates
                    throw new InterruptedException(); // Throw an interrupt exception}}finally {
            if (failed) // Failed to acquire lock - thread interrupted
                cancelAcquire(node); // Cancel the lock acquisition attempt}}}Copy the code

Add to the wait queue by addWaiter, then enter the polling, try to acquire the lock if the precursor node is the head node, otherwise suspend the thread. If the thread is interrupted during the suspend phase, InterruptedException is thrown and the logic of the cancelAcquire method is processed in the finally block

public class AbstractQueuedSynchronizer {

    private void cancelAcquire(Node node) {
        if (node == null) // node is null and is ignored
            return;

        node.thread = null;

        Node pred = node.prev; // Get the precursor node
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev; // Skip cancelling to the precursor node

        Node predNext = pred.next;// The successor node of the precursor node is not cancelled

        node.waitStatus = Node.CANCELLED; // Set the current node to cancel

        if (node == tail && compareAndSetTail(node, pred)) { // If the current node is the tail node, then the skip cancelated precursor point is set to the tail node
            compareAndSetNext(pred, predNext, null); // Set the successor node of the tail node to NULL
        } else {
            int ws;
            if(pred ! = head &&// Skip cancelling the precursor point is not the head node
                    ((ws = pred.waitStatus) == Node.SIGNAL || // Skip cancelled precursor has a waitStatus of -1
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // Skip the cancelled precursor's waitStatus<=0 and set it to -1pred.thread ! =null) { // Thread is not null for the precursor node that skips cancellations
                Node next = node.next; // Get the successor node of the current node
                if(next ! =null && next.waitStatus <= 0) // The successor is not null& waitStatus of the successor <=0
                    compareAndSetNext(pred, predNext, next); // The successor of the cancelled precursor node will be skipped to point to the successor of the current node
            } else {
                unparkSuccessor(node); // Wake up the node's successor
            }

            node.next = node; // help GC}}}Copy the code

AbstractQueuedSynchronizer# cancelAcquire slightly longer, but is divided into three parts: but, first of all is to filter out precursor thread thread has been canceled, in the process of the current thread pointing in the direction of the filtered precursor precursor node, if there is a waiting thread 1, 2, 3, 4, 5, Thread 4 is interrupted in the process of acquiring the lock, and threads 2 and 3 are found to be cancelled during the filtering, so the precursor node of thread 4 is pointed to thread 1. Then determine whether the current thread node is the tail node, if so, set the tail node to the front node, and set the successor node of the new tail node to NULL. If it is not a tail node, then it satisfies:

The front node is not the head node; When waitStatus=-1 or waitStatus<=0, set waitStatus=-1 to see if it succeeds. Thread is not null

, when the successor node of the current node is not NULL and waitStatus of the successor node is <=0, the filtered precursor node points to the successor node of the current node. If one of the above three conditions is not met, the successor node of the current node is woken up.

  1. Lock acquisition process

Obtain lock -tryAcquireNanos(int arg, long nanosTimeout)

  1. Lock acquisition in exclusive mode for a given period of time, timeout failed, if interrupted, lock acquisition terminated
  2. Source code analysis
public class AbstractQueuedSynchronizer {
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted()) // Determine whether the thread is interrupted
            throw new InterruptedException();
        return tryAcquire(arg) || // Try to acquire the lock first, return true on success, false on failuredoAcquireNanos(arg, nanosTimeout); }}Copy the code

First, determine whether the thread is interrupted, interrupt will throw an exception, at the same time erase interrupt mark; Next, try tryAcquire, return true on success, false on failure; Finally, the doAcquireNanos method is called to obtain the lock within the set time

public class AbstractQueuedSynchronizer {
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L) Return false if the wait time is less than or equal to 0
            return false;
        final long deadline = System.nanoTime() + nanosTimeout; // Current system time + wait time
        final Node node = addWaiter(Node.EXCLUSIVE); // Add to wait queue
        boolean failed = true; // Failure flag
        try {
            for (;;) {
                final Node p = node.predecessor(); // The precursor node
                if (p == head && tryAcquire(arg)) { // The precursor node is head. Try again to acquire the lock
                    setHead(node); // Set the current node to the head node, also waiting for the queue to queue out
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime(); // Waiting time is left
                if (nanosTimeout <= 0L) // Less than 0 returns false
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) && // Determine whether the thread is suspended
                        nanosTimeout > spinForTimeoutThreshold) // The waiting time is greater than the spin time
                    LockSupport.parkNanos(this, nanosTimeout); // The lock is acquired during the wait time
                if (Thread.interrupted()) // Determine whether to interrupt
                    throw newInterruptedException(); }}finally {
            if (failed)
                cancelAcquire(node); // Interrupt processing}}}Copy the code

Gets the timeout before going into spin, constructs the current thread node, and adds it to the wait queue, and then goes into spin. If precursor node as the head, try again to get the lock, lock are successful and get out of the current node team, failure to compute waiting time, secondly determine whether need to hang up and wait for remaining thread is greater than the spin time, will thread hangs, otherwise will return false when the next spin, cancel the current thread for locks at the same time. When a thread is suspended, if it is awakened due to thread interruption, an exception is thrown and the current thread is freed from the lock.

  1. Lock acquisition process

Releases the lock – release

  1. Wake up successor nodes in exclusive mode
  2. Source code analysis
public class AbstractQueuedSynchronizer {
    public final boolean release(int arg) {
        if (tryRelease(arg)) { // This method has a subclass implementation that determines whether it can wake up subsequent nodes
            Node h = head; // Head node
            if(h ! =null&& h.waitStatus ! =0) // The head node is not null and waitStatus is not initialized
                unparkSuccessor(h); // Wake up the node's successor
            return true;
        }
        return false; }}Copy the code

The tryRelease method is used to determine whether the successor node is wakeable. If it is not wakeable, false is returned; otherwise, whether the head node is null and waitStatus is not 0 is determined, wake the successor node and return true

public class AbstractQueuedSynchronizer {
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus; // waitStatus
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); // Set waitStatus to 0

        Node s = node.next; // Subsequent nodes
        // If the subsequent node is null, it is the tail node. If waitStatus>0, it is the node acquired by the cancelled lock
        if (s == null || s.waitStatus > 0) { // The successor is null or waitStatus>0
            s = null;
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)// Get the tail node and traverse its precursor node
                if (t.waitStatus <= 0) / / if waitStatus < = 0
                    s = t; // Assign the precursor node to t for the next traversal
        }
        if(s ! =null)
            LockSupport.unpark(s.thread); // Wake up the successor node}}Copy the code

If waitStatus<0, set waitStatus to 0 and then obtain the successor node. If the successor node is null or its waitStatus>0, find the uncancelled node to be awakened and wake it up. 3. Wake up the process