Writing in the front

Based on the analysis of Java and contract awarding Java. Util. Concurrent source, without need to know about AbstractQueuedSynchronizer (hereinafter abbreviated AQS) this abstract class, because it is the foundation of Java and contracting out the tools, Is the basis for implementing classes such as ReentrantLock, CountDownLatch, Semaphore, FutureTask, etc.

When I was researching AQS, I searched for a lot of materials. In the process, I came across this article by Daishen, which was very clear and full of content. Therefore, I decided to quote the content of this article and add my personal views and understandings where necessary, that is, to stand on the shoulder of Daishen to learn. Those who are interested can go directly to check out the original article, which is well worth a look.

Fair locks and unfair locks

ReentrantLock defaults to unfair locking unless you pass true in the constructor.

public ReentrantLock(a) {
    // Default unfair lock
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code

Lock method for fair locking:

static final class FairSync extends Sync {
    final void lock(a) {
        acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 1. In contrast to unfair locks, there is one more judgment: whether there is a thread waiting
            if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false; }}Copy the code

The lock method for an unfair lock:

static final class NonfairSync extends Sync {
    final void lock(a) {
        // 2. In contrast to a fair lock, CAS is performed directly first, and success is returned
        if (compareAndSetState(0.1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }protected final boolean tryAcquire(int acquires) {
        returnnonfairTryAcquire(acquires); }}/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // There is no judgment on blocking queues
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true; }}else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
Copy the code

Summary: There are only two differences between fair and unfair locks:

  • When an unfair lock is called, CAS will be called to grab the lock. If the lock is not occupied at this time, CAS will return the lock.
  • After a CAS failure, an unfair lock will enter the tryAcquire method just like a fair lock. In the tryAcquire method, if the lock is released (state == 0), the unjust lock will be directly seized by CAS. However, fair lock will determine whether there is a thread waiting in the queue, if there is, it will not grab the lock, obediently queue to the back.

There are only two differences between a fair lock and an unfair lock. If both CAS are unsuccessful, then the unfair lock is the same as the fair lock.

Relatively speaking, unfair locks have better performance because of their higher throughput. Of course, unfair locking makes the timing of lock acquisition more uncertain and can lead to chronically hungry threads in the blocking queue.

Condition

Let’s take a look at the Condition usage scenario. Condition is often used in producer-consumer scenarios. See Doug Lea for an example:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // Condition depends on lock to generate
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    / / production
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // Queue is full, wait until not full to continue production
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // The queue is not empty
        } finally{ lock.unlock(); }}/ / consumption
    public Object take(a) throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // Wait until the queue is not empty to continue consuming
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // The queue is not full
            return x;
        } finally{ lock.unlock(); }}}Copy the code

We can see that condition must hold the lock before it is used. Wait (), notify(), or notifyAll() is required to hold the monitor lock on an Object. ArrayBlockingQueue implements producer-consumer in this way, so use this example only as a learning example. You can use ArrayBlockingQueue directly in production

We often use obj.wait(), obj.notify(), or obj.Notifyall () to implement similar functions, but they are based on object monitor locks. While the Condition is based on the already in here, and already is dependent on AbstractQueuedSynchronizer.

Before moving on, the reader should have a general idea in mind. Condition is dependent on ReentrantLock and must acquire the lock in order to operate either with an await call or with a signal wake up call.

Each ReentrantLock instance can produce multiple ConditionObject instances by calling newCondition multiple times:

final ConditionObject newCondition(a) {
    // instantiate ConditionObject
    return new ConditionObject();
}
Copy the code

We first look at we focus on the Condition of the implementation of the class AbstractQueuedSynchronizer ConditionObject in class.

public class ConditionObject implements Condition.java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // The first node of the conditional queue
          // the serialization is not involved in the serialization process
        private transient Node firstWaiter;
        // The last node of the conditional queue
        private transientNode lastWaiter; .Copy the code

When WE introduced AQS last time, we had a blocking queue that held the queue of threads waiting to acquire locks. So let’s introduce another concept here, called the condition queue, and I’ve drawn a simple diagram to illustrate this.

The blocking queue here would be better called sync queue, but I’ll stick with it for the sake of the previous article. Remember the two concepts here, blocking queues and conditional queues.



Basically, if you understand this diagram, you will know how condition is processed. So, I’ll explain the diagram briefly and then explain the code implementation in detail.

  • Both conditional and blocking queue nodes are instances of Node, because the conditional queue nodes need to be moved to the blocking queue.
  • We know that an instance of ReentrantLock can generate multiple Condition instances by calling newCondition() multiple times, which corresponds to condition1 and condition2. Notice that ConditionObject has only two attributes: firstWaiter and lastWaiter;
  • Each condition has an associated condition queue. If thread 1 calls condition1.await(), it will wrap the current thread 1 as Node and join the condition queue. The condition queue is a one-way linked list.
  • Condition1.signal () triggers a wake up call and moves the “firstWaiter” in condition1 to the end of the blocking queue to await the lock and await the return of the await method.

2->3->4 above describes the simplest process without considering interrupts, signalAll, and await methods with timeout parameters, but understanding this is the main purpose of this section. At the same time, it is very intuitive to see which operations are thread-safe and which are not. Once you understand this diagram, the following code analysis should be simple. Let’s take a look at the wait method:

First, this method is interruptible. The other method is awaitUninterruptibly().
// This method blocks until the signal method is called or interrupted
public final void await(a) throws InterruptedException {
    // As usual, since the method is responding to interrupts, it determines the interrupt state at the beginning
    if (Thread.interrupted())
        throw new InterruptedException();

    // Add to the condition queue
    Node node = addConditionWaiter();

    // Release the lock, return the state value before release the lock
    // await() before the current thread must hold the lock, which must be released
    int savedState = fullyRelease(node);

    int interruptMode = 0;
    // There are two ways to exit the loop
    IsOnSyncQueue (node) returns true, that is, the current node has been moved to the blocking queue
    // 2. checkInterruptWhileWaiting(node) ! = 0 goes to break and exits the loop, representing a thread interrupt
    while(! isOnSyncQueue(node)) { LockSupport.park(this);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    // After being woken up, it enters the blocking queue, waiting to acquire the lock
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
}
Copy the code

In fact, I have generally said the whole await process, let’s step by step to the above several points with the source code.

Adds a node to a conditional queue

AddConditionWaiter () adds the current node to the conditional queue, which is thread-safe.

// Enqueue the node corresponding to the current thread and insert it at the end of the queue
private Node addConditionWaiter(a) {
    Node t = lastWaiter;
    // If the last node of the conditional queue is cancelled, it is cleared
    // If waitStatus is not equal to node. CONDITION, this Node is unqueued.
    if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// This method iterates through the conditional queue and clears all cancelled nodes from the queue
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // When node is initialized, specify waitStatus as Node.condition
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // t is now lastWaiter, the end of the queue
    // If queue is empty
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
Copy the code

The above piece of code is simple enough to put the current thread at the end of the conditional queue. In the addWaiter method, there is an unlinkCancelledWaiters() method, which is used to clear nodes from the queue that have been unwaited for. This method is called once if a cancel operation occurs while await (this will be said later), or if the last node is found to be cancelled while the node is being queued.

// The wait queue is a one-way linked list, traversing the list to clear out the nodes whose wait has been cancelled
// This is a list operation
private void unlinkCancelledWaiters(a) {
    Node t = firstWaiter;
    Node trail = null;
    while(t ! =null) {
        Node next = t.nextWaiter;
        // If the Node is not in node. CONDITION, the Node is cancelled
        if(t.waitStatus ! = Node.CONDITION) { t.nextWaiter =null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        elsetrail = t; t = next; }}Copy the code

Fully release the exclusive lock

After the node is queued, int savedState = fullyRelease(node) is called; Fully release: ReentrantLock fully release: ReentrantLock fully release: ReentrantLock fully release: ReentrantLock fully release

So let’s think about savedState here. Condition1.await () = state (2) condition1.await() = state (2) condition1.await() = state (0) So that other threads can hold the lock. When it wakes up, it needs to hold 2 locks again to continue.

// savedState represents the state value before release
// For the simplest operation: lock. Lock (), then condition1.await().
// Then state changes from 1 to 0 by this method, the lock is released, and this method returns 1
// Accordingly, if the lock reenters n times, savedState == n
/ / if the method fails, the node will be set to "cancel", and throws an exception IllegalMonitorStateException
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // The current state is used as a release parameter, that is, the lock is completely released and state is set to 0
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw newIllegalMonitorStateException(); }}finally {
        if(failed) node.waitStatus = Node.CANCELLED; }}Copy the code

Condition1.await () can be called without holding a lock, but release(savedState) must return false because it does not hold a lock. Go to the exception branch, then go to the finally block and set Node.waitStatus = Node.cancelled. The joined node will be “CANCELLED” by its successor.

Waiting to enter a blocking queue

So once you release the lock, then you have this, and then you spin, and if you find that you’re not in the blocking queue, then you hang up and wait to be transferred to the blocking queue.

int interruptMode = 0;
// If it is not a blocking queue, it is a blocking queue
while(! isOnSyncQueue(node)) {// The thread is suspended
    LockSupport.park(this);

    // You can wait to see when it is unpark
    if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
        break;
}
Copy the code

IsOnSyncQueue (Node Node) is used to determine whether the Node has been moved to the blocking queue:

// The Node is initialized with waitStatus = node. CONDITION
// I mentioned earlier that we need to move the node from the conditional queue to the blocking queue.
// This method is to determine whether the node has moved to the blocking queue
final boolean isOnSyncQueue(Node node) {

    // The node's waitStatus is set to 0 when moved over, which will be discussed later when we say signal
    // If waitStatus is still Node.CONDITION (-2), it must still be in the CONDITION queue
    // If the node prev is still null, it is definitely not in a blocking queue.
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // If node already has a successor node next, it must be blocking the queue
    if(node.next ! =null) 
        return true;

    // The following method starts at the end of the blocking queue and works backwards. If it finds an equal number, it is in the blocking queue, otherwise it is not

    // Node.prev ()! = null to infer that node is blocking on the queue? The answer is: no.
    // Set node.prev to tail.
    // Then the CAS operation sets itself as the new tail, but this time the CAS may fail.

    return findNodeFromTail(node);
}

// Traverse through the end of the blocking queue, returning true if found
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false; t = t.prev; }}Copy the code

Returning to the previous loop, isOnSyncQueue(node) returns false, then locksupport.park (this); Here the thread hangs.

Signal wakes up the thread and moves to the blocking queue

For your understanding, here we look at the wake up operation first, because we just went to locksupport.park (this); The thread is suspended, waiting to wake up. The wake up operation is usually performed by another thread, as in the producer-consumer pattern, if the thread is suspended for consumption, then when the producer produces something, it calls signal to wake up the waiting thread to consume it.

// Wake up the longest waiting thread
// Move the node corresponding to this thread from the conditional queue to the blocking queue
public final void signal(a) {
    // The thread calling signal must hold the current exclusive lock
    if(! isHeldExclusively())throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if(first ! =null)
        doSignal(first);
}

// Walk back from the conditional queue to find the first node to transfer
// Because as we said earlier, some threads will be unqueued, but may still be in the queue
private void doSignal(Node first) {
    do {
          // Set firstWaiter to the first after the first node because the first node is leaving soon
        // Set lastWaiter to null if there is no node waiting after first is removed
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // Because first is about to be moved to the blocking queue, the link to the conditional queue is broken here
        first.nextWaiter = null;
    } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
      // In this while loop, if the first transition fails, the first node after first is selected for the transition, and so on
}

// Move the node from conditional queue to blocking queue
// True indicates successful transfer
// False indicates that the node has been canceled before signal
final boolean transferForSignal(Node node) {

    // If the CAS fails, the node's waitStatus is no longer Node.CONDITION, and the node is cancelled.
    // Now that it has been canceled, there is no need to move it
    // Otherwise, set waitStatus to 0
    if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
        return false;

    Enq (node): spins to the end of the blocking queue
    // Note that the return value p is node's precursor node in the blocking queue
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws > 0 indicates that node unlocks the wait lock on the precursor node in the blocking queue, and directly wakes up the corresponding node thread. What happens when you wake up? I'll explain later
    // If ws <= 0, compareAndSetWaitStatus will be called. If ws <= 0, compareAndSetWaitStatus will be called.
    if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// If the precursor node cancels or the CAS fails, the thread will wake up here
        LockSupport.unpark(node.thread);
    return true;
}
Copy the code

Under normal circumstances, the ws > 0 | |! CompareAndSetWaitStatus (p, ws, Node.SIGNAL) ws <= 0 and compareAndSetWaitStatus(p, ws, Node.SIGNAL) returns true. So you don’t wake up the node thread in the if block. This method then returns true, which means that the signal method ends and the node enters the blocking queue. If a blocking occurs, the precursor node in the queue cancels the wait, or the CAS fails, simply wake up the thread and let it proceed to the next step.

Check the interrupt status after waking up

After signal in the previous step, our thread is moved from the conditional queue to the blocking queue, and is ready to acquire the lock. Once the lock is retrieved, continue to execute. Wait for the thread to recover from suspension and continue reading

int interruptMode = 0;
while(! isOnSyncQueue(node)) {// The thread is suspended
    LockSupport.park(this);

    if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
        break;
}
Copy the code

InterruptMode is explained first. InterruptMode can be REINTERRUPT (1), THROW_IE (-1), or 0

  • REINTERRUPT: When an await is returned, the interrupt status needs to be reset
  • THROW_IE: To return an await, InterruptedException is thrown
  • 0: indicates that no interrupt has occurred during await

There are three situations that will make locksupport.park (this); This return continues:

  • Regular path. Signal -> Transfer node to blocking queue -> Lock acquired (unpark)
  • The thread is interrupted. While in park, another thread interrupts this thread
  • Signal when we said that the precursor node after the transfer was cancelled or the CAS operation to the precursor node failed
  • Wake up. Object.wait() also has this problem

Thread wakes up after the first step is to call checkInterruptWhileWaiting (node) this method, this method is used to determine whether interruption happened during thread hanging, if interrupt comes up is interrupted before signal invocation, or signal after the interrupt.

// 1. If the signal has been interrupted before, return THROW_IE
// 2. If signal is followed by interrupt, return REINTERRUPT
// 3. No interrupt occurred, return 0
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
Copy the code

Thread.interrupted() : If the current Thread is interrupted, this method returns true and resets the interrupted status to false, hence the use of reinterrupts.

See how to determine whether the interrupt occurred before or after signal:

// This method is called only if the thread is interrupted
// Move the unwaited node to the blocking queue if necessary
// Returns true: If this thread is canceled before signal,
final boolean transferAfterCancelledWait(Node node) {
    // Use CAS to set the node status to 0
    // If this CAS step succeeds, the interrupt occurred before signal, because if signal had occurred first, waitStatus would have been set to 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // Put the node into the blocking queue
        // We can see that even if an interrupt occurs, it is still transferred to the blocking queue
        enq(node);
        return true;
    }

    // The CAS failed, and the signal method must have set waitStatus to 0
    // The signal method moves the node to the blocking queue, but it may not be done yet, and spins to wait for it to complete
    // Of course, this is rare: after a signal call, an interrupt occurs before the transfer is complete
    while(! isOnSyncQueue(node)) Thread.yield();return false;
}
Copy the code

Again, even if an interrupt occurs, the node is still moved to the blocking queue. At this point, you probably know how the while loop exits. Either an interruption or a successful transfer. This describes a scenario where there is a thread that is behind the conditional queue, but because it is interrupted, it will wake up, and then it realizes that it is not the one that was signalled, but it will voluntarily enter the blocking queue.

Get an exclusive lock

After the while loop comes out, here’s the code:

if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;Copy the code

Since the while comes out, we determine that the node has entered the blocking queue and is ready to acquire the lock. AcquireQueued (node, savedState) acquireQueued(node, savedState) acquireQueued(node, savedState) acquireQueued(node, savedState) This means that the current thread has acquired the lock, and state == savedState. Note that the blocking queue is entered regardless of whether an interrupt has occurred, and the return value of the acquireQueued(Node, savedState) is to indicate whether the thread has been interrupted. If true is returned, it is interruptMode! = THROW_IE indicates that interrupts occurred before signal. Set interruptMode to REINTERRUPT for future interrupts. Moving on:

if(node.nextWaiter ! =null) // clean up if cancelled
    unlinkCancelledWaiters();
if(interruptMode ! =0)
    reportInterruptAfterWait(interruptMode);
Copy the code

In the spirit of being meticulous, here’s Node.nextwaiter! = null how to satisfy. NextWaiter = null to disconnect the node from the conditional queue. However, in the case of determining an interrupt, did it occur before or after signal? NextWaiter = null Node.nextwaiter = null Node.nextwaiter = null node.nextwaiter = null node.nextwaiter = null As we said earlier, the unlinkCancelledWaiters method is also called when a node cancels, so here it is.

Handling interrupt state

At this point, we can finally say what interruptMode is used for.

  • 0: do nothing, not interrupted;
  • The THROW_IE: await method throws InterruptedException because it represents an interruption during await();
  • REINTERRUPT: Re-interrupts the current thread, since it represents an interrupt that has occurred after signal() and has not been interrupted during await()
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
Copy the code

This part of the interrupt state, you should understand it, if not, read it several times.

Await with timeout mechanism

ConditionObject is completely parsed after the previous 7 steps. Let’s briefly analyze the await method with the timeout mechanism.

public final long awaitNanos(long nanosTimeout) 
                  throws InterruptedException
public final boolean awaitUntil(Date deadline)
                throws InterruptedException
public final boolean await(long time, TimeUnit unit)
                throws InterruptedException
Copy the code

All three methods are similar, so let’s pick one out:

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    // Wait so many nanoseconds
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // Current time + Wait time = Expiration time
    final long deadline = System.nanoTime() + nanosTimeout;
    // Use to return await whether timeout occurs
    boolean timedout = false;
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {// Time is up
        if (nanosTimeout <= 0L) {
            // The wait is cancelled because of break. Cancel your waiting must call transferAfterCancelledWait (node) this method
            // If this method returns true, the node is successfully moved to the blocking queue within this method
            // If false is returned, signal has already occurred, and the signal method has migrated the node. So there's no timeout
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // The value of spinForTimeoutThreshold is 1000 nanoseconds, or 1 millisecond
        That is, if it is less than 1 millisecond, do not choose parkNanos, the spin performance is better
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
        // Get the remaining time
        nanosTimeout = deadline - System.nanoTime();
    }
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
        unlinkCancelledWaiters();
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode);
    return! timedout; }Copy the code

The idea of timeout is very simple. ‘await’ with no timeout parameter is park and wait for someone to wake up. Now it is time to call the parkNanos method to sleep for the specified time, wake up and determine if signal has been called, if it has not timed out, otherwise it has timed out. If you time out, do it yourself, move to the blocking queue, and grab the lock.

The await of InterruptedException is not thrown

The last section of Condition.

public final void awaitUninterruptibly(a) {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while(! isOnSyncQueue(node)) { LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}
Copy the code

Very simple, paste the code everyone will understand, I will not nonsense.

The cancellation of the AbstractQueuedSynchronizer exclusive lock queue

This article said is AbstractQueuedSynchronizer, just like Condition said too much, to get ideas. Next, I want to talk about how to eliminate competition for locks. As mentioned in the previous article, the most important method is this, which is where we will find the answer:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

First, the node must have been enqueued by this method. I posted the parkAndCheckInterrupt() code:

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

If you put these two pieces of code together, it’s clear. If we want to unqueue one thread, we need to interrupt it in another thread. For example, if a thread calls lock() for a long time and does not return, I want to interrupt it. Once it is interrupted, the thread is removed from locksupport.park (this); Wake up, then thread.interrupted (); Returns true. One problem we found was that even if the interrupt woke up the thread, it simply set interrupted = true and continued the next loop. Also, because thread.interrupted (); Clears the interrupt state and returns false the second time parkAndCheckInterrupt is entered. So, see that in this method the interrupted is used only to record whether or not an interruption has occurred, and then used for the method to return the value, and does nothing else. So let’s see how the outer method handles the acquireQueued return false.

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }static void selfInterrupt(a) {
    Thread.currentThread().interrupt();
}
Copy the code

So the lock() method handles interrupts as if you interrupt and I grab the lock anyway. It doesn’t really matter, except that after I grab the lock, I set the thread’s interrupt state without throwing any exceptions. Once the lock is acquired, the caller can either check to see if the interrupt occurred or ignore it.

The divider

Let me draw a dividing line. Did you feel cheated? I said a lot, but it had nothing to do with cancellation. Let’s look at another lock method for ReentrantLock:

public void lockInterruptibly(a) throws InterruptedException {
    sync.acquireInterruptibly(1);
}
Copy the code

Throws InterruptedException (InterruptedException).

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if(! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code

Keep going:

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // If there is an exception, end the method immediately and throw an exception.
                // Instead of just marking the return value of this method as representing the interrupted state
                // The exception is thrown directly, and the outer layer is not caught, all the way out to lockInterruptibly
                throw newInterruptedException(); }}finally {
        // Failed is true if you use InterruptedException
        if(failed) cancelAcquire(node); }}Copy the code

While we’re at it, let’s talk about cancelAcquire:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    // Find a suitable precursor. It basically says "please remove" all nodes in the queue before it that have been canceled.
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
            Node next = node.next;
            if(next ! =null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC}}Copy the code

There is nothing to be said for this method, which should be CANCELLED until Node CANCELLED, as long as waitStatus is set to Node.CANCELLED, lots of cases are CANCELLED out of the blocking queue, either actively or passively.

InterruptedException InterruptedException

In the previous article, we touched on a lot of interrupts, so this is a summary. If you are completely familiar with interrupts, there is no need to read this section, which is written for beginners.

Thread the interrupt

Interrupt is not a command like kill -9 PID. It does not mean that we interrupt a thread and the thread stops running. Interrupts represent thread state. Each thread has an interrupt state associated with it. This is a Boolean value of true or false, starting with false. Interrupts in Java are not the same as operating system interrupts. For interrupt states, we need to focus on the following methods in the Thread class:

// Thread instance method, holding Thread instance reference to detect Thread interrupt status
public boolean isInterrupted(a) {}

// A static method in Thread that checks whether the calling Thread has been interrupted
// Note: When this method returns the interrupted status, it resets the interrupted status of this thread to false
// So, if we call this method twice in a row, the return value of the second call must be false
public static boolean interrupted(a) {}

// An instance method of the Thread class that sets the interrupt status of a Thread to true
public void interrupt(a) {}
Copy the code

If you interrupt a thread by setting its status to true, what the interrupted thread does with this status is up to the thread itself. Such as the following code:

while(! Thread.interrupted()) { doWork(); System.out.println("I'm done with one thing, ready to move on to the next, if no other thread interrupts me.");
}
Copy the code

This code will respond to interrupts, and it will determine the interrupt state as it works. However, other interrupts are relatively rare outside of the JDK source code, which is very particular.

Of course, interrupts are more than just thread states, otherwise there is no need to develop a concept. A thread is automatically aware when it is interrupted if it is in one of three conditions:

  • Join (), join(long), join(long, int), sleep(long), sleep(long), wait(long, int) Int). Throws InterruptedException. If a thread blocks on one of these methods (which, as we know, cause the current thread to block), if another thread interrupts the thread, it will immediately return from the methods, throwing InterruptedException and resetting the interrupt status to false.
  • I/O blocking operations in classes that implement the InterruptibleChannel interface, such as the Connect and Receive methods in DatagramChannel. If the thread block here, interrupt threads can lead to throw these methods ClosedByInterruptException and reset the interrupt status.
  • Select method in Selector, for reference to the NIO article I wrote. Once interrupted, the method returns immediately

The above three cases are the most special because they are automatically aware of the interruption (automatic, also based on the underlying implementation) and reset the interrupt state to false after doing the corresponding operation. Are there only three ways to automatically detect interruptions? No, if the thread is blocked in locksupport. park(Object obj) method, also called suspended, this interrupt will also cause the thread to wake up, but will not reset the interrupted state, so wake up to detect the interrupted state will be true.

InterruptedException overview

It is a special exception, not that the JVM handles it differently, but that it is used in a special way. In general, we see methods like wait() on objects, lockInterruptibly() on ReentrantLock, sleep() on threads, and so on, These methods all carry a throws InterruptedException and are commonly referred to as blocking methods.

One notable feature of blocking methods is that they take a long time (not absolutely, just that time is not in control), and that their method completion returns often depend on external conditions, such as wait depending on notify of another thread, lock depending on UNLOCK of another thread, and so on.

When we see a method with throws InterruptedException, we know that the method should be a blocking method and that we can often interrupt it if we want it to return sooner.

With the exception of a few special classes (Object, Thread, etc.), sensing interrupts and returning ahead of time is done by polling the interrupt state. When we need to write our own interruptible methods, we do so by determining the thread’s interruptible state at the appropriate time (usually at the beginning of the loop) and then doing something about it (usually the method directly returns or throws an exception). Of course, we should also note that if we take a long time to loop, it will take a long time to sense that the thread has broken.

Handling interrupts

Once an interrupt occurs and we receive this information, what do we do about the interrupt? This section will briefly examine this question. We often write code like this:

try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    // ignore
}
// go on 
Copy the code

When the sleep ends and we continue, we often don’t know if the piece of code actually slept for 10 seconds, or just slept for one second before being interrupted. The problem with this code is that we swallow the exception message. (For the sleep method, I believe that most of the time we don’t care if we interrupt, here’s an example). There are two methods of ReentrantLock:

public void lock(a) {
    sync.lock();
}

public void lockInterruptibly(a) throws InterruptedException {
    sync.acquireInterruptibly(1);
}
Copy the code

As we mentioned earlier, the lock() method does not respond to interrupts. If thread1 calls the lock() method and does not grab the lock for a long time, thread2 interrupts it. Thread1 does not respond to the request and continues to grab the lock without throwing away the “interrupted” message. We can look at the following code:

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// We can see that there is no special processing here, which is to record the interrupt state.
        // This way, if the outer method needs to be checked, at least we don't lose this information
        selfInterrupt();// Thread.currentThread().interrupt();
}
Copy the code

For the lockInterruptibly() method, which has a throw InterruptedException, the signal tells us that if we want to cancel the thread lock grab, we simply interrupt the thread and it will return immediately. InterruptedException is thrown.

In parallel packets, there are many examples of this kind of handling of interrupts, providing two methods: responsive interrupt and non-responsive interrupt. In the case of non-responsive interrupt, logging the interrupt rather than losing this information. For example, the two methods in Condition look like this:

void await(a) throws InterruptedException;
void awaitUninterruptibly(a);
Copy the code

In general, if the method throws InterruptedException, the first sentence in the method body is:

public final void await(a) throws InterruptedException {
    if (Thread.interrupted())
        throw newInterruptedException(); . }Copy the code

The skillful use of interrupts helps us to write elegant code and to analyze other people’s source code.