AQS conditional queue Condition

In front of the AQS source code we respectively described the AQS synchronous queue exclusive mode, shared mode lock acquisition and release, these are in the synchronous queue; Conditional queues are also included in AQS, as shown below:

Today we’ll focus exclusively on the Condition queue, which you probably don’t use very often in our work, but which you’ll find in some concurrent container source code. For example: LinkedBlockingQueue, ArrayBlockingQueue and so on, this also is to see some concurrent containers before the source. So I think it’s necessary;

Note: Different conditional queues are made up of firstWaiter and lastWaiter!

Introduction to conditional queues in AQS

We give an example of press close to life, for instance, we line up to use the toilet, through the line finally got locked into the toilet, but unfortunately found that forget to take paper, encounter this kind of thing is very helpless, but have to accept the fact that at this time can only be good to go out ready toilet paper (that is, into the conditions in the queue waiting), of course will also release the lock off before go out, In order to let the people behind the queue in, ready toilet paper (condition satisfied) after the condition is met into the synchronization queue to queue;

Let’s look at what methods Condition contains:

 // Conditional wait in response to thread interruption
   void await(a) throws InterruptedException;
   
   // Conditional wait that does not respond to thread interrupts
   void awaitUninterruptibly(a);
   
   // Set conditional wait for relative time (no spin)
   long awaitNanos(long nanosTimeout) throws InterruptedException;
   
   // Set conditional wait relative to time (spin)
   boolean await(long time, TimeUnit unit) throws InterruptedException;
   
   // Set conditional wait for absolute time
   boolean awaitUntil(Date deadline) throws InterruptedException;
   
   // Wake up the headers in the conditional queue
   void signal(a);
   
   // Wake up all nodes of the conditional queue
   void signalAll(a);
Copy the code

“Await” and “signal” methods; They are to put the current thread into the conditional queue and wake up the thread in the conditional queue.

Let’s look again at the way Condition is used:

public class UseCondition {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();


    public void method1(a) {
        try {
            lock.lock();
            System.out.println("Current thread:" + Thread.currentThread().getName() + "Enter the waiting state");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("Current thread:" + Thread.currentThread().getName() + "Release lock");
            condition.await();// Object wait
            System.out.println("Current thread:" + Thread.currentThread().getName() + "Carry on");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public void method2(a) {
        try {
            lock.lock();
            System.out.println("Current thread:" + Thread.currentThread().getName() + "Enter...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("Current thread:" + Thread.currentThread().getName() + "Sound a wake...");
            condition.signal();//Object notify
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ lock.unlock(); }}public static void main(String[] args) {
        UseCondition uc = new UseCondition();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run(a) { uc.method1(); }},"t1");
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run(a) { uc.method2(); }},"t2"); t1.start(); t2.start(); }}Copy the code

Output result:

Current thread: T1 Enters the wait state. Current thread: T1 releases the lock. Current thread: T2 Enters.... Current thread: T2 issues a wake up.. Current thread: T1 Continues executionCopy the code

The “waiter” thread and the “Signaler” thread are enabled. When the “Waiter” thread starts to execute because the condition is not met, we call the condition. Await method to make the thread wait and release the lock, and the “Signaler” thread will change the condition after obtaining the lock and notify all waiting threads to release the lock. At this point, the Waiter thread obtains the lock, and since the Signaler thread has changed the condition, the condition is now satisfied relative to the Waiter, and the execution continues.

Deep source analysis -await() method

Once we have a general idea of the Condition, we parse the await() method:

  public final void await(a) throws InterruptedException {
    				Throw an exception if the current thread is interrupted
            if (Thread.interrupted())
                throw new InterruptedException();
    				// Add the node to the conditional queue
            Node node = addConditionWaiter();
    				// Release all previously acquired lock resources. There may be a reentrant lock
            long savedState = fullyRelease(node);
            int interruptMode = 0;
    				// Check that the current state of the node is -2 and then wait in the while (addConditionWaiter() is set to -2). Prev == null indicates that the current node is not in the synchronization queue. The synchronization queue has a head node, while the conditional queue does not
            while(! isOnSyncQueue(node)) {// Suspend the current thread
                LockSupport.park(this);
              	// When you wake up, check to see if you are interrupted
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            // This indicates that the node has been added to the synchronization queue or interrupted
            // This method is familiar? The same lock fetch method is invoked for an exclusive lock, and you can see from this that conditional queues can only be used for an exclusive lock
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// We have successfully obtained the exclusive lock
            // Delete the cancelled node from the conditional queue
            if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
        }

Copy the code

Next we look at the await method with a few questions. 1. How do I add a node to the queue? 2. How do I release the acquired lock? 3. How to exit the await() method? So let’s see how do we add the node to the queue?

        private Node addConditionWaiter(a) {
            Node t = lastWaiter;
           	// Check to see if the last node is cancelled. If the last node is cancelled, delete the cancelled node in the queue
            if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Remove the node from the queue
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
          	// Encapsulate the current Node as Node and set Node to -2
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
              	// If lastWaiter equals null, the queue is currently empty and therefore firstWaiter points to a node
                firstWaiter = node;
            else
              	// If not empty, point the next node of lastWaiter to node
                t.nextWaiter = node;
          	// The last node is a node
            lastWaiter = node;
            return node;
        }
Copy the code

The addConditionWaiter() method adds the node to the conditional queue. The unlinkCancelledWaiters() method loves the work of removing a canceled node from a queue as a way of removing a node from a linked list, so I don’t want to post it here because it’s too long and irrelevant. That solves the first problem! Now let’s look at the second question.

At the beginning of the article, when we introduced the conditional queue with the example of using toilet, we said that when he forgot to bring toilet paper, he could go to the conditional queue and wait, but he should release the lock currently acquired, so that others could have a chance to go to the toilet.

The fullyRelease(node) method releases the lock acquired by the current thread

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
          	// Get synchronization status, there may be a reentrant lock, so release!
            int savedState = getState();
          	// Release the lock, which was explained in the previous article and will not be repeated here
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
              	// Failed to throw an exception
                throw newIllegalMonitorStateException(); }}finally {
            if (failed)
              	// If the lock fails to be released, the node is cancelled, so in addConditionWaiter() you just need to check whether the last node is cancellednode.waitStatus = Node.CANCELLED; }}Copy the code

This method is very simple, there is nothing to say, the only thing is to release the current node all locks, otherwise no one else can obtain the lock!

At this point, the node joins the conditional queue, the resource is released, and the next step is suspension. The isOnSyncQueue() method is not expanded, and the comments above are pretty clear;

LockSupport.park(this); Suspends the current thread. This is not a big deal, but it is the code after wake up, which can be either a call to signal() or an interrupt, so the first thing to wake up is to check how you got woken up?

while(! isOnSyncQueue(node)) {// Suspend the current thread
     LockSupport.park(this);
     // When you wake up, check to see if you are interrupted
     if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
           break;
      }   


     private int checkInterruptWhileWaiting(Node node) {
       // Join synchronization queue because of interrupt: THROW_IE
       // Join the synchronization queue by calling signal() : REINTERRUPT
       // No interrupt request was received during this period: 0
            return Thread.interrupted() ? 
              (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
        }

		// Modify the node and join the synchronization queue
    final boolean transferAfterCancelledWait(Node node) {
      	Signal (); // If the CAS is successful, the interrupt was sent before the signal() method
      	// I posted the signal() method here to make it easier to understand
      
        /* private void doSignal(Node node) { * * transferForSignal(node); *} * * Boolean transferForSignal(Node Node) {// If (! compareAndSetWaitStatus(node, Node.CONDITION, 0)){ * return false; *} *} */
      	// So return true to indicate that the node is queued by interrupt, false to indicate that the node is queued by signal
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
          // Join the synchronization queue; https://juejin.cn/post/6964571893340831780 this article tells the story, here is not opened!
            enq(node);
            return true;
        }
      	// If the above setting fails, the node has been woken up by the signal() method. Since the signal() method adds the node to the synchronization queue, only the spin wait is required.
        while(! isOnSyncQueue(node)) Thread.yield();return false;
    }
Copy the code

This is where the method returns either THROW_IE or REINTERRUPT. So the method goes inside the while() loop again; This is definitely not equal to 0, and then break!

           while(! isOnSyncQueue(node)) {// Suspend the current thread
                LockSupport.park(this);
              	// When you wake up, check to see if you are interrupted
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
            // This indicates that the node has been added to the synchronization queue or interrupted
            // This method is familiar? The same lock fetch method is invoked for an exclusive lock, and you can see from this that conditional queues can only be used for an exclusive lock
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// We have successfully obtained the exclusive lock
            // Delete the cancelled node from the conditional queue
            if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
              	// Interrupt processing in response according to interrupt mode
                reportInterruptAfterWait(interruptMode);
        }
		// Choose to throw an exception or set the thread interrupt state depending on the interrupt timing
     private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
       			Throw an exception if the interrupt mode is THROW_IE
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
              Thread.currentthread ().interrupt();
                selfInterrupt();
     }

Copy the code

If (acquireQueued(Node, savedState) && interruptMode! = THROW_IE); AcquireQueued (node, savedState) we have explained in AQS source code;

That is, once the node is out of the conditional queue, it acquires the lock in exclusive mode. Once it acquires the lock again, the reportInterruptAfterWait method is called to respond accordingly. If it is caused by an interrupt, interruptMode is THROW_IE and an exception is thrown when the lock is acquired again. If it is caused by the signal method, interruptMode is REINTERRUPT and is re-interrupted when the lock is acquired.

We are done with the await() method here! Let’s look at the signal() method:

Deep source code analysis -signal() method

The signal() method is simpler, so let’s look at it briefly:

    public final void signal(a) {
      			// Check if the current thread has an exclusive lock, and throw an exception if it does not
            if(! isHeldExclusively())throw new IllegalMonitorStateException();
      			// Get the first node in the conditional queue, for which all subsequent operations will be performed
            Node first = firstWaiter;
            if(first ! =null)
                doSignal(first);
        }
				// Wake up
        private void doSignal(Node first) {
            do {
              		// Move the firstWaiter pointer back one bit to the next node of FIRST, if null, there is only one first node in the queue;
                if ( (firstWaiter = first.nextWaiter) == null)
                  	// Since there are no nodes in the conditional queue, lastWaiter is also null
                    lastWaiter = null;
              	// Remove the header from the wait queue
                first.nextWaiter = null;
              	// Add the node to the synchronization queue. If transferForSignal fails, wake up the next node
            } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
        }

final boolean transferForSignal(Node node) {
  			/ / CAS will ws from - 2 of the current node is set to 0, here and interruption of the above two links, if transferAfterCancelledWait method will first state changed, cause this CAS operation failed
        if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
            return false;
  			// Add this node to the end of the synchronization queue.
        Node p = enq(node);
        int ws = p.waitStatus;
        // Wake up the current node if the front node is cancelled or fails to change its status
        // The current node is already in the synchronization queue, and the wake up operation acquireQueued() or the correct suspend operation is performed
        if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }

Copy the code

conclusion

Since conditional queues are a key component of the blocking queue implementation, some of the basics of the concurrent container implementation were also explained at the beginning; So it’s worth trying to understand how it works; Two things to understand first

  • Conditional queues are built on a specific lock
  • The conditional queue and the synchronous queue are two different queues. The conditional queue depends on conditional wake up, and the synchronous queue depends on lock release wake up

Here’s a picture to sum up the article: