preface

Before the advent of Lock, synchronized was used to implement the wait and notify mechanism in combination with Object’s wait and notify. The Condition interface provides a similar approach, used in conjunction with Lock.

introduce

The Object monitor methods: Wait, notify, and notifyAll, which are used together with Synchronized. The Condition method must obtain a lock before executing the monitor method: Await, signal, signalAll, used with Lock, must obtain Lock Lock before Condition is executed.

Source code analysis:

The basic structure

Through UML class diagram can be seen, Condtion is an interface, its implementation in AbrtractQueuedSynchronizer inner class ConditionObject. ContionObject = “await”; ContionObject = “signal”;

ContionObject parameters

  /** First node of condition queue. */
  private transient Node firstWaiter; // Wait for the first node of the queue
  /** Last node of condition queue. */
  private transient Node lastWaiter; // Wait for the queue tail
Copy the code

Await method

Await causes the current thread to hang until another thread interrupts, or the thread that acquired the lock calls signal. A thread suspended on an await method will wake up only if:

  1. Interrupted by another thread
  2. The other thread that acquires the lock calls Signal, and the node of the current thread is the first node
  3. Other threads that acquire locks call signalAll
  4. False wake up

Before this method can return, the thread must regain the lock. Now look at the implementation logic within AQS:

public final void await(a) throws InterruptedException {
    if (Thread.interrupted()) // Check whether there is an interruption
        throw new InterruptedException();
    Node node = addConditionWaiter(); // Add the node to the end of the conditional queue
    int savedState = fullyRelease(node); // Release the lock completely and save the reentrant count
    int interruptMode = 0;
    while(! isOnSyncQueue(node)) {// Spin determines whether the node is added to the synchronization queue
        LockSupport.park(this); // If the queue is not synchronized, it is suspended
        / * 1 not suspend for the suspension of checkInterruptWhileWaiting check return 0 2 before invoking signal/siganlall interrupted, return THROW_IE (1), Interrupt (1) if THROW_IE is returned, InterruptedException will be thrown. If REINTERRUPT is returned it will be followed by an interrupt signal * */
        if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
            break;
    }
    AcquireQueued returns true if no interrupt is generated in the while and acquireQueued is generated in the acquireQueued queue. Still the interruptMode = REINTERRUPT * /
    if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;/* If thread interrupts THROW_IE or fullyRelease fails, node.nextwaiter is null. If node.nextWaiter is not null, you need to clean up these nodes */
    if(node.nextWaiter ! =null) // clean up if cancelled
        unlinkCancelledWaiters(); // Clear the nodes that have been canceled
    if(interruptMode ! =0)
        reportInterruptAfterWait(interruptMode); Throws InterruptedException if interruptMode is -1 and resets the interrupt identifier if 1
}
Copy the code

Await method executes logic:

  1. Add a Node of type Node.CONDITION to the end of the CONDITION queue
  2. Release the lock completely and save its state
  3. Check if the queue is in sync, if not, suspend the queue
  4. After waking up from the hang, check to see if an interruption has occurred
  5. Out of the while loop, in the synchronization queue, ready to acquire the lock

When introducing the await execution process, two queues are mentioned:

  1. A queue of ContinObject, also called a conditional queue
  2. AQS queue, synchronization queue

addConditionWaiter

private Node addConditionWaiter(a) {
      Node t = lastWaiter; // Queue end
      // If lastWaiter is cancelled, clean out.
      if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// If the tail node has been cancelled, all cancelled nodes are cleared
          unlinkCancelledWaiters(); // Clear all canceled nodes
          t = lastWaiter; // Get the tail again
      }
      Node node = new Node(Thread.currentThread(), Node.CONDITION); // Create a Node of type Node.CONDITION
      if (t == null) // If it is the first time
          firstWaiter = node;
      else
          t.nextWaiter = node; // Attach to the end of the current node
      lastWaiter = node; // Make lastWaiter point to yourself
      return node;
  }
Copy the code

As you can see from the addConditionWaiter method, only a Node of type Node.CONDITION is created. It can also be seen from the code:

  1. Conditional queues are used only for Threads, waitStatus, and nextWaiter in nodes
  2. Conditional queues are one-way queues

Comparison between AQS queue and Contion queue:

AQS queue:Contion queue:WaitStatus describes the possible states:

  1. The default is 0
  2. If the value is greater than 0, the node times out or is interrupted and needs to be removed from the queue
  3. If the value is -1, the successor node is waiting to be woken up
  4. If it’s minus 2, it’s in a conditional queue
  5. If the value is -3, the wakeup state needs to be propagated downward, which is used when sharing locks

fullyRelease(Node node)

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState(); // Get the status
        if (release(savedState)) { // Release locks all at once, no matter how many times you re-enter them
            failed = false;
            return savedState;
        } else {
            throw newIllegalMonitorStateException(); }}finally {
        if (failed)
            node.waitStatus = Node.CANCELLED; // Set the current node to cancel when an exception occurs during lock release}}Copy the code

As we know from the above analysis, when the thread that has obtained the lock calls the await method, a Node is created, added to the end of the conditional queue, and the holding thread is released. If it is not in the sync queue, it will be park until another thread signals.

isOnSyncQueue

final boolean isOnSyncQueue(Node node) {
  if (node.waitStatus == Node.CONDITION || node.prev == null) // If the Node state is Node.CONDITION or its precursor is NULL, it must be in the conditional queue
      return false;
  if(node.next ! =null) // node.next is not null, so it must be in a synchronous queue. Otherwise, it must be in a synchronous queue
      return true;
  return findNodeFromTail(node); // loop through the tail
 }
 private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false; t = t.prev; }}Copy the code

If the thread returns from park, it checks for interrupts, as analyzed above, and acquireQueued:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // Record whether an error occurred
    try {
        boolean interrupted = false; // Whether an interrupt is performed
        for (;;) {
            final Node p = node.predecessor(); // Get the precursor
            if (p == head && tryAcquire(arg)) { // If you are head and try to acquire the lock successfully, tryAcquire template method has concrete subclass implementation
                setHead(node); SetHead is thread-safe and will succeed if only one thread succeeds at a time
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if (failed)
            cancelAcquire(node); // Cancel the fetch}}Copy the code

When the node enters the synchronization queue, it attempts to acquire the resource and sets the value of savedState, which represents the number of reentrant times released when the lock was originally released.

The entire execution process:

public final void signal(a) {
    if(! isHeldExclusively())// Determine if the current thread is the thread that acquired the exclusive lock
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; // Wake up from the first node
    if(first ! =null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null) // If the first node has no successors
            lastWaiter = null; // The tail node points to null
        first.nextWaiter = null; // Release the current node
    } while(! transferForSignal(first) &&// If the current first node is cancelled and the new first node is not null, the loop continues to wake up(first = firstWaiter) ! =null);
}
final boolean transferForSignal(Node node) {
      
      if(! compareAndSetWaitStatus(node, Node.CONDITION,0)) // If the CAS fails, the node is canceled
          return false;

     
      Node p = enq(node); // Join the synchronization queue and return to the precursor node
      int ws = p.waitStatus; // The status of the precursor node
      /** * If the state of the precursor is greater than 0 (cancel), or the state of the precursor fails to be set (node. SIGNAL is set to tell the precursor to wake me up before releasing the lock), the current Node is woken up to ensure the robustness of the synchronization queue */
      if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
  }
Copy the code

Reference: segmentfault.com/a/119000003…