I. Introduction to AQS

Doug Lea when writing the JUC introduced Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer, of the abstract synchronizer is based on the queue, we generally called AQS.

What is a AQS?

Many implementations of locks and synchronization components (ReentrantLock, CountDownLatch, CyclicBarrier, FutureTask, etc.) in JUC are based on AQS.

Why do YOU need AQS?

A lot of locks, synchronizers, they do a lot of the same things. If you can extract a utility class, you can use it directly. For things like ReentrantLock and CountDownLatch, you can block out a lot of details. Just focus on their own business logic.

Synchronizer fundamentals

// acquire does the following:
while(Failed to obtain synchronization status) {if(The current thread is not queued) {the current thread is queued; } attempts to block the current thread; } The current thread is removed from the wait queue// release:Updating synchronization Statusif(the synchronization state is sufficient to allow a blocking thread to acquire) {unblock one or more waiting queues; }Copy the code

In order to do this, the following three basic steps need to work together:

  • Atomic management of synchronization state (useCASAtom modified shared flag bit)
  • Wait queue management
  • Thread blocking and unblocking (callLockSupportThe underlying layer is based onUnsafeOf the classpark() & unpark()Methods)

Second, source code analysis

AQS Key attributes

// java.util.concurrent.locks.AbstractQueuedSynchronizer
// the header encapsulates the thread currently holding the lock
private transient volatile Node head;

// Each new node comes in and is inserted at the end to form a linked list
private transient volatile Node tail;

// Indicates the status of the current lock. 0 indicates that the lock is not occupied. A value greater than 0 indicates that the current lock is held by a thread
private volatile int state;

// represents the thread currently holding the exclusive lock
/ / AbstractQueuedSynchronizer inherited from AbstractOwnableSynchronizer an attribute
private transient Thread exclusiveOwnerThread;
Copy the code

Node Key Attributes

// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
static final class Node {
  // indicates that the node is currently in shared mode (threads are waiting for locks in shared mode)
  static final Node SHARED = new Node();
  // indicates that the node is currently in exclusive mode (threads are waiting for locks in exclusive mode)
  static final Node EXCLUSIVE = null;

  // This thread cancels the lock contention
  static final int CANCELLED =  1;
  // indicates that the thread is ready, waiting for the resource to be released
  static final int SIGNAL    = -1;
  // indicates that the node is in a waiting queue, and the node thread is waiting to wake up.
  static final int CONDITION = -2;
  The next acquireShared field should be propagated unconditionally (this field is only used if the current thread is SHARED)
  static final int PROPAGATE = -3;

  // The value is 1, -1, -2, -3, or 0 (the default value for initialization).
  // If waitStatus > 0, the thread cancels the wait
  volatile int waitStatus;

  // A reference to the precursor node
  volatile Node prev;

  // A reference to a successor node
  volatile Node next;

  // The package is in the current node thread
  volatile Thread thread;

  // A reference to the next node waiting for the condition (not discussed yet)
  Node nextWaiter;
}
Copy the code

AQS Key methods

AQS sets methods for subclasses to implement according to their needs

// java.util.concurrent.locks.AbstractQueuedSynchronizer
// Get the lock (change the flag bit), you can wait until the lock is acquired
// final modifier. Subclasses can only be called but cannot be modified
// Use tryAcquire() to try to acquire the lock
public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }/ / releases the lock
// final modifier. Subclasses can only be called but cannot be modified
// Call the tryRelease() of the call class, release the lock, return true for full release, false otherwise
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// Try to get the lock (modify flag bit), return immediately
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// Try to release the lock
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

// Create and queue nodes for the current thread and the given pattern
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

/ / team
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}// Wake up the successor node
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
      	// Find the first node in the normal blocking state
      	// Some nodes are omitted to avoid high concurrency
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
 	if(s ! =null)
      	/ / wake
        LockSupport.unpark(s.thread);
}

// Interrupt the current thread
static void selfInterrupt(a) {
    Thread.currentThread().interrupt();
}
Copy the code

Acquire: while (! tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; }

Release: if (tryRelease(arg)) unblock the first queued thread;

See AQS principle from ReentrantLock

Sync, the internal class of ReentrantLock, has both fair and unfair lock synchronization modes.

// java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
    / / for the lock
    abstract void lock(a);
  
    // The lock release process does not distinguish between fair and unfair locks, so it is placed in the parent class Sync
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
          	If state is 0, no thread has acquired the lock. Set the current thread holding the exclusive lock to null
            free = true;
            setExclusiveOwnerThread(null);
        }
      	If state is not 0, the thread has not fully released the lock. Set the remaining unreleased times to state
        setState(c);
        return free;
    }
  
  	final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
              	throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false; }}Copy the code
// java.util.concurrent.locks.ReentrantLock.NonfairSync
// Implementation of an unfair lock
static final class NonfairSync extends Sync { 
    protected final boolean tryAcquire(int acquires) {
        // Since this is an unfair lock, it takes the implementation of an unfair lock
        return nonfairTryAcquire(acquires);
    }
  
    / / for the lock
    final void lock(a) {
      // Use CAS to set the State variable
      if (compareAndSetState(0.1))
          // if the CAS is successful, the current thread is set to the exclusive thread
          setExclusiveOwnerThread(Thread.currentThread());
      else
          // if CAS fails, acquire()
          // Set tryAcquire() to tryAcquire()
          AcquireQueued () ¶
          acquire(1); }}Copy the code
// java.util.concurrent.locks.ReentrantLock.FairSync
// Fair lock implementation
static final class FairSync extends Sync {
  	protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true; }}else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
              throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  
    / / lock
    final void lock(a) {
      	Set tryAcquire() to tryAcquire()
        AcquireQueued () ¶
      	acquire(1); }}Copy the code
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// Get the lock in spin mode
Return true if interrupted while waiting, false otherwise
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            TryAcquire (); tryAcquire(); tryAcquire()
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
              	interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}// Check and update the status of failed nodes
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
      	// If the waiting status of the previous node is SIGNAL, the status of the previous node is normal. True is returned
      	return true;
    if (ws > 0) {
      	// If the wait state of the previous node is greater than 0, the wait status of the previous node is cancelled. Continue to search until the previous node is good
      	// Threads queuing into the blocking queue are suspended, and the wake up operation is done by the precursor node
        // We need to find a good precursor node, because we still need it to wake up
        do {
          	node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
      	// If the waitStatus of the precursor node is not equal to -1 or 1, it can only be 0, -2, -3
      	// When each new node joins the queue, waitStatus is not set, so it is initialized to 0
      	// If the tail node is the tail node, its waitStatus is 0
      	// Use CAS to set waitStatus of the precursor node to SIGNAL.
      	compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
  	/ / this returns false, the next time death cycle in shouldParkAfterFailedAcquire () will return true
    return false;
}

// Let the current thread park and check if it is interrupted
private final boolean parkAndCheckInterrupt(a) {
  	// Block the thread, and the underlying call to UNSAFE's API
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

Third, summary

This time, starting from the source code, a simple introduction of AQS, mainly from acquire() & Release () as the entrance, to ReentrantLock lock, queuing, waiting for suspension, wake up, release to analyze the basic principles of AQS.