Doug Lea in java.util.Concurrent (JUC) provides a set of basic tools to help developers develop concurrent programs more easily, including Lock, Semaphore, CountDownLatch, CyclicBarrier, and more. To achieve these classes are implemented with the aid of a tool can control multiple threads concurrent access, that is AbstractQueuedSynchronizer (AQS).

The data structure of AQS is shown in the figure below. It maintains a bi-directional FIFO queue, and threads trying to acquire locks exist in the queue in the form of nodes

Source code analysis

Before analyzing the source code, you need to know some basics.

First of all, there are two types of locks: exclusive lock and shared lock. As the name implies, the exclusive lock means that only one thread can acquire the lock at the same time, while the shared lock allows a maximum of N threads to acquire the lock at the same time. According to whether interrupt request is responded in the process of obtaining the lock, it can be divided into interrupt response request and interrupt non-response request.

Second, each node has its corresponding state, and its initial state is 0.

// Wait timeout or interrupt, cancel to obtain lock
static final int CANCELLED =  1;
// The subsequent nodes of this node are suspended and need to be woken up when the lock is released or cancelled
static final int SIGNAL    = -1;
// Indicates the node is in the Condition queue
static final int CONDITION = -2;
// For shared locks, the next attempt to acquire a shared lock will be unconditionally propagated
static final int PROPAGATE = -3;
Copy the code

In order to have a better understanding of the source code, I will explain the source code by adding annotations on the basis of the source code (English annotations are the original source code annotations). Where there are method calls, you can skip to the explanation of the corresponding method and follow the process step by step. You can also understand the steps of the whole method through comments, and then look at each method called before.

An exclusive lock

Exclusive lock acquisition that does not respond to interrupts

TryAcuquire () is an abstract lock acquisition method, and returns true to indicate that the lock was acquired successfully. The implementation class needs to define its own lock acquisition method */
public final void acquire(int arg) {
    if(! tryAcquire(arg) &&// If tryAcquire() fails to acquire the lock, join the synchronization queue via addWaiter() and try acquireQueued() again
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        AcquireQueued () returns true if an interrupt is detected because it does not respond to an interrupt
		// Because thread.interrupted () was used during the check, the interrupt flag was reset and needs to be restored
        selfInterrupt();
}

/** * wrap thread information as a Node to the end of the synchronization queue */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // Try to add the node to the end of the queue via CAS once
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            returnnode; }}Pred == null; preD == null
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // loop CAS indefinitely until the node is added to the end of the queue
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}/** * The thread that was added to the synchronization queue because it failed to acquire the lock is trying to acquire the lock */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // Since this is a first-in, first-out queue, it is only its turn to compete for the lock if its first driver is a header (which indicates that it has acquired the lock)
            if (p == head && tryAcquire(arg)) {
				// tryAcquire() returns true to obtain the lock successfully
                Nod e node is set to head, and setHead() does not require CAS, since there will be no contention
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // If the command fails to be obtained, check whether it needs to be suspended. If it needs to be suspended, check whether there is an interruption message
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
	// As a refresher, SIGNAL indicates that subsequent nodes are suspended and need to be woken up when the lock is released or cancelled
	// If the precursor node is already in SIGNAL state, the current thread can safely suspend and wait for the precursor to wake up
    if (ws == Node.SIGNAL)
        return true;
    // ws > 0 indicates that the precursor node was CANCELLED (CANCELLED == 1) and the CANCELLED node needs to be skipped
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // Change the precursor node to SIGNAL via CAS, but false is returned
		// If the lock is still missing in the next loop, the first judgment of the method is entered and returns true,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt(a) {
    // Suspend the thread
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

Exclusive lock acquisition in response to an interrupt

/** * method entry */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if(! tryAcquire(arg)) doAcquireInterruptibly(arg); }/** * The only difference between this method and the one that does not respond to interrupts is that when an interrupt is detected, it throws an interrupt exception instead of returning true
private void doAcquireInterruptibly(int arg)
    / /...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt(a))
                throw new InterruptedException(a);
    // ...
}
Copy the code

Exclusive lock acquisition with timeout response interrupt

/** * method entry */
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

/** * is basically the same as before, returning false if timeout occurs, and suspending the thread also uses parkNanos */ with timing
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // Return false if timeout occurs
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // Note that nanosTimeout > spinForTimeoutThreshold (default: 1000 nanoseconds) will not suspend until it is lower than this threshold
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Exclusive lock release

/** * tryRelease(), like lock, is an abstract method that needs to be implemented by subclasses ** the actual job is to wake up the successor node, and dequeue is also done by the successor node when the lock is acquired
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // If h.waitStatus == 0, it is not SIGNAL state and there is no node to wake up
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
        
    // If the successor node has been cancelled, then rearrange the successor until none has been cancelled
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    // If there is an uncancelled successor, wake it up
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

A Shared lock

Shared lock acquisition that does not respond to interrupts

The key implementation differences between shared and exclusive locks are as follows:

The thread node in the queue tries to acquire the lock resource. If successful, it wakes up the waiting shared node and passes the wake up event, which in turn wakes up all the shared nodes behind this node.

TryAcquireShared is an abstract method: * If the value is less than 0, the current thread has acquired the lock, but subsequent threads cannot, that is, do not need to propagate subsequent nodes */ if the value is greater than 0, subsequent threads can also obtain the lock, need to propagate subsequent nodes */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // >=0 Indicates that the lock is successfully obtained
                if (r >= 0) {
                    // In this case, unlike the exclusive model, it is necessary to propagate backward after setting the header
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    
    // If propagate > 0 or H.waitStatus < 0 (propagate) needs to wake up the successor node
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // If the successor is an exclusive node, it will not wake up
        if (s == null|| s.isShared()) doReleaseShared(); }}private void doReleaseShared(a) {
    for (;;) {
        Node h = head;
        // There are at least 2 nodes in the queue, otherwise no propagation is necessary
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // Unlike shared locks, this method can be called in both the setHeadAndPropagate and releaseShared methods
                // There is a case where one thread is propagating backward after acquiring the lock, and another thread is releasing the lock, so CAS control is required
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // ws == 0 indicates that it is the last node in the queue, so CAS is PROPAGATE, indicating that propagation is needed at the next tryShared
            // If this fails, a new successor node changes it to SIGNAL and hangs, then the loop continues
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // If the head changes, a new queued thread has acquired the lock, check again
        if (h == head)                   // loop if head changed
            break; }}Copy the code

Shared lock Release

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // After successful release, propagate later
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

Condition

Conditions provide a mechanism for communication between threads in the same way that wait() and notify() in Synchronize do, and there can be multiple conditions for the same lock.

Condition is an interface, and in fact Lock. newCondition() returns the AQS internal ConditionObject class. The two core methods are await() and signal().

When await() is called, the thread joins a queue to wait, which is similar to a synchronous queue and is a FIFO queue, but uses the same data structure and uses only one-way functionality. The data structure diagram maintained is as follows:

Since a thread that can call await() must have acquired the lock, none of the following operations require additional CAS operations to handle thread contention */
public final void await(a) throws InterruptedException {
     if (Thread.interrupted())
        throw new InterruptedException();
     // Join the queue
     Node node = addConditionWaiter();
    FullyRelease () calls realse(state), the exclusive lock release method, which releases all reentrant locks at once. State records the number of reentrant locks
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // If it has not been added to the synchronization queue by signal(), it is suspended, unless interrupted
    while(! isOnSyncQueue(node)) { LockSupport.park(this);
        Break out of the loop if interrupted, returning 0 or THROW_IE or REINTERRUPT
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    // At this point, whatever loops are added to the synchronization queue (either by signal() or in interrupt detection methods)
    // ----------------------------------------------------------
    // Don't forget if acquireQueued() returns a broken lock
    // If the lock is acquired while interruptMode! = THROW_IE, which is also considered to be interrupted after signal() and set to REINTERRUPT
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// This condition is satisfied only if threads that interrupt before signal() still have nodes in the wait queue
    if(node.nextWaiter ! =null)
        // Remove the node whose state is not CONDITION from the queue
        unlinkCancelledWaiters(); 
    if(interruptMode ! =0)
        // Throw an exception or reset the interrupt flag bit
        reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter(a) {
    Node t = lastWaiter;
    // If the last wait queue is cancelled, clear it
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// This method is to go through the list from beginning to end and remove all nodes that are not in CONDITION from the queue
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/** * if not interrupted, returns 0 * if interrupted before signal, returns THROW_IE, indicating that an exception needs to be thrown * if interrupted after signal, returns REINTERRUPT, indicating that no exception is thrown, only the interrupt bit */ is restored
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    // If CAS succeeds, it is not yet added to the synchronization queue by signal
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // Since there is no signal, we need to join the synchronization queue before we can compete for the lock later
        enq(node);
        return true;
    }
    /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */
     // Indicates that the signal has been sent, in case it has not yet been added to the synchronization queue
    while(! isOnSyncQueue(node)) Thread.yield();return false;
}

/** * wake up from the first thread in the wait queue */
public final void signal(a) {
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        / / out of the team
        first.nextWaiter = null;
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}

/** * Clear the wait queue and add the waiting nodes to the synchronization queue in sequence */
public final void signalAll(a) {
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while(first ! =null);
}


final boolean transferForSignal(Node node) {
    // If CAS fails, it is cancell
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    // Add the node to the synchronization queue. Note that enq() returns node's precursor p
    Node p = enq(node);
    int ws = p.waitStatus;
    // Because the node is still suspended, the state of the precursor node needs to be changed to SIGNAL according to the structure of the synchronous queue
    // If the precursor is cancelled, or the CAS precursor fails with SIGNAL, the thread is woken up and left to acquire the lock itself, although the thread may be suspended again, this is harmless
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
Copy the code

The image in this article is from The Art of Concurrent Programming in Java