Analysis of AQS and its derivative concurrency tools

The overview AQS

AQS : Java1.5 provides a framework to provide synchronization services, internal through the use of a FIFO CLH queue to carry out multi-thread mutually EXCLUSIVE/cooperative operation, the general framework is divided into two large mode EXCLUSIVE mode, SHARED mode, through the definition of template method, and the core synchronization/wake up rules. Convenient for us to customize the extension synchronous concurrency tool. Among them, ReentrantLock, Semaphore, CountDownLatch and so on in the parallel packet are all extended implementation of AQS

The framework processes

Basic data structure

  • A two-ended queue for FIFO
  • volatile stateVariables to store state information: the number of reentrant locks,PermitsQuantity and so on.

Core API

This is just a list of common important methods, but can be skipped for the moment, see the specific method flow analysis, and then look at the API.

Core template method

In practice, resource (state) changes can be made directly by calling template methods

  • acquire(): Exclusive mode acquisition, as opposed toAcquireInterrutibly () supports the acquisition of interrupts
  • release(): Release in exclusive mode
  • acquireShared: Shared mode access, there are also interrupt supported versionsacquireSharedInterrutibly
  • releaseShared: Release the shared mode
Subclass extension method
  • tryXXX, includingXXXforAcquire/ReleaseAnd so on. There are shared and exclusive modes and acquire and release modes. Implemented for subclasses, for specific get/release rules.
  • doXXXX.XXXforacquire/release/acquireShared.., it isAQSFixed implementation inObtaining failed or releasing succeededThe callback method for the coreWait/wake up logic.
Correlation auxiliary method
  • addWaite(): Logic added to the end of the wait queue when lock acquisition fails
  • shouldParkAfterFaildAcquire(): The core determines whether the logic needs to wait, and willNodeThe node’swaitStatusSet toSIGNALAnd then loop to judge/wait
  • parkAndCheckInterrupt: callLockSupport#part()To wait for
  • hasQueuedPredecessors: Indicates whether a thread is waiting
  • cancelAcquire: Cancel fetch
  • acquireQueued: Core wait logic

Exclusive core process source code analysis

acquire()Resource acquisition method
public final void acquire(int arg) {
  /** * 1. TryAcquire () exclusive mode acquire lock, ignore interrupt * -fairlock: try to acquire lock if there is no thread waiting or own reentrant * -nonfairlock: no thread acquire lock/own reentrant retry acquire lock * 2. * -addWaiter () added to the end of the wait queue, head is virtual and does not store data * -acquirequeued () spins/Wait to acquire lock */
  if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();/* If it breaks, reset the interrupt flag */
}
Copy the code
tryAcquire(): subclass implementation, specific access to resources, the following useReentrantLockFor example,
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { /*1. If no thread has acquired the lock, the CAS determines whether any thread is waiting to acquire the lock
                if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true; }}/*2. Count (reentrant)*/ if the current thread holds the lock
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; }}Copy the code

Relatively simple, roughly divided into two processes

  1. c==0, indicates that no thread occupies the lock, and determines whether a node is waitinghasQueuedPredecessors(), which is fair lock implementation, whileNonfairLockWill directlyCASAttempt to acquire a lock
	If waitStatus<=0 is the nearest node to the head, the thread is holding the waitStatus<=0
	// If not, a thread is waiting to acquire the lock
	public final boolean hasQueuedPredecessors(a) {
        Node h, s;
        if((h = head) ! =null) {
            if ((s = h.next) == null || s.waitStatus > 0) {
                s = null; // traverse in case of concurrent cancellation
                for(Node p = tail; p ! = h && p ! =null; p = p.prev) {
                    if (p.waitStatus <= 0) s = p; }}if(s ! =null&& s.thread ! = Thread.currentThread())return true;
        }
        return false;
    }
Copy the code
  1. currentThread == getExclusiveOwnerThread: indicates the reentrant lock. State +=acquire can be used
Lock acquisition failureacquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  • addWaiter(): constructs the node and adds it to the end, initializing the head node if it is initialized
    private Node addWaiter(Node mode) {
        Node node = new Node(mode);
        for (;;) {
            Node oldTail = tail;
            if(oldTail ! =null) {
              	// Cas is set to the tail node
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    returnnode; }}else {
              	// Initialize the head nodeinitializeSyncQueue(); }}}Copy the code
  • acquireQueued(): Spins to get the lock/wait and returns the interrupt flag
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor(); /*1. Obtain the front node */ of the current node
                if (p == head && tryAcquire(arg)) { /*2. Check whether prev is the head node, if yes, try to obtain lock */
                    setHead(node); /*3. Set to header and return interrupt flag */
                    p.next = null; // help GC
                    return interrupted;
                } /*4. Prev is a header but preempted by another lock (NonfairLock) or prev is not a header. If prev is not SIGNAL, set it to SIGNAL and return false, which removes all CANCEL nodes
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();/*5. Wake-up SIGNAL */}}catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throwt; }}Copy the code
release()Resource Release method
  1. Attempt to release the lock, implemented by a subclass
  2. If the lock is released successfully, wake head.next (wake up the next node)
    public final boolean release(int arg) {
      /*1. Release the lock, if the current thread, subtract corresponding state,state to 0, release the lock successfully */
        if (tryRelease(arg)) {
            Node h = head;
          /* where h==null indicates that no thread is waiting, h.waitStatus==0 only if another thread fails to acquire the lock and acquireQueue() has not yet waited (no need to wake up)*/
            if(h ! =null&& h.waitStatus ! =0) 
                unparkSuccessor(h); /*2. Wake up the next thread */
            return true;
        }
        return false;
    }
Copy the code
ReentrantLock#tryRelease()implementation
			If release is 0, we can lock it, otherwise return false
			protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
Copy the code
unparkSuccessor(): Wakes up the thread on which the Node. next node lock is held, if the status is illegal, such as null, or cancelled

If node.next waitStatus > 0, unpark() from tail -> node to the nearest node

private void unparkSuccessor(Node node) {
				 /* Set the current node waitStatus to 0*/
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
				/* Why are we traversing from tail forward? Because the addWaiter is a non-atomic operation, extreme cases can have only prev Pointers attached */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null; 
            for(Node p = tail; p ! = node && p ! =null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if(s ! =null) /* Get the node at the front of the queue */
            LockSupport.unpark(s.thread);
    }
Copy the code
await(): in order toConditionSpecific waiting to be realized

In fact, I think the core idea of AQS can be similar to that of pipe procedures. Pipe procedures also have multiple conditional variables, and each variable has its own wait queue. When await(), it will join the wait queue and release the lock. Unlock () wakes up the head.next thread in the synchronization queue to complete the wake-up task

        public final void await(a) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
          	//1. Add the node to CondtionObject#lastWriter
            Node node = addConditionWaiter();
          	//2. Release the lock and wake up other threads
            int savedState = fullyRelease(node);
            int interruptMode = 0
						 //3. Block when not in the synchronization queue (certainly not the first time)
            while(! isOnSyncQueue(node)) { LockSupport.park(this);
                if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
                    break;
            }
          	//4. Try to obtain the lock after unlocking
            if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
                unlinkCancelledWaiters();
            if(interruptMode ! =0)
                reportInterruptAfterWait(interruptMode);
        }
Copy the code
signal

Add the current ConditionObject#firstWaiter to the AQS synchronization queue using enq() and wake up when unlock()

        public final void signal(a) {
            if(! isHeldExclusively())throw new IllegalMonitorStateException();
          	/ / 1. Obtain firstWaiter
            Node first = firstWaiter;
            if(first ! =null)
							//2. doSignal
                doSignal(first);
        }

        private void doSignal(Node first) {
          	// Remove firstWaiter from the queue
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
              	//help GC
                first.nextWaiter = null;
              // To the synchronization queue
            } while(! transferForSignal(first) && (first = firstWaiter) ! =null);
        }

    final boolean transferForSignal(Node node) {
      	// If the CAS status fails to change, false is returned
        if(! node.compareAndSetWaitStatus(Node.CONDITION,0))
            return false;
				// Important: join the end of the synchronization queue and return the tail node
        Node p = enq(node);
      	// If the tail node is cancelled, or the CAS fails to set to SIGNAL, the tail node is waked directly; otherwise, the tail node is waked when the UNLOCK () is unlocked
        int ws = p.waitStatus;
        if (ws > 0| |! p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
    }
Copy the code

Shared core process source code parsing

acquireShared()
    public final void acquireShared(int arg) {
      	Permitting, return value has three types of value types
      	A positive value indicates that an permitting has been obtained, and there is a surplus permitting
      	// 0 indicates success, but there is no permitsremaining
				// A negative value indicates an acquisition failure.
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code

Semaphore#tryAcquireShared() : semaphore fair lock implementation

protected int tryAcquireShared(int acquires) {
  for (;;) {
    // If a node in the synchronization queue is waiting, -1 is returned, indicating that the synchronization fails
    if (hasQueuedPredecessors())
      return -1;
    int available = getState();
    int remaining = available - acquires;
    // Returns the surplus permits, CAS is modified if the surplus is not less than 0
    if (remaining < 0 ||
        compareAndSetState(available, remaining))
      returnremaining; }}Copy the code

DoAcquireShared () : Failed to obtain the permission lock

private void doAcquireShared(int arg) {
  //1. Add the node to tail
  final Node node = addWaiter(Node.SHARED);
  boolean interrupted = false;
  try {
    for (;;) {
      Prev = prev = prev = prev = prev = prev = prev = prev = prev = prev = prev
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        With permitting >=0, permits is obtained successfully
        if (r >= 0) {
          //4. Set the current node to head, and wake up node.next when r >0
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          return; }}//5. Block if head is not present or the lock fails to be acquired
      if(shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); }}catch (Throwable t) {
    //6. Exception Cancels the current fetch operation and throws an exception
    cancelAcquire(node);
    throw t;
  } finally {
    //7. Reset the interrupt flag if it is interrupted
    if(interrupted) selfInterrupt(); }}Copy the code

Summary: In fact, whether exclusive or shared, are the following steps

  1. Implemented by subclassestryXXxPerform related locking (permitsOr anything else aboutstateProperty)
  2. If the fetch fails, it is added to the blocking queue and waits
  3. When release is released, it will passunparkSuccessorTo awaken

Core Component implementation

ReentrantLock

Where, the exclusive mode is implemented, where state represents the number of reentries. Acquire through AQS to complete the mutex

// The tryAcquire implementation is a simple CAS modification of the state variable
protected final boolean tryAcquire(int acquires) {
  final Thread current = Thread.currentThread();
  int c = getState();
  if (c == 0) {
    //1. Fair lock Checks whether any node is waiting in the synchronization queue. If no node is waiting in the synchronization queue, try the CAS to obtain the lock
    if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true; }}//2. Add state to reentrant logic
  else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
  }
  return false;
}
//tryRelease, if state is reduced to 0, release the lock. If the value is not 0, the release fails
protected final boolean tryRelease(int releases) {
  int c = getState() - releases;
  if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
  boolean free = false;
  if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
  }
  setState(c);
  return free;
}
Copy the code

Semaphore

Semaphores are tools that allow multiple threads to enter critical sections, so they must be implemented in shared mode

//tryAcquireShared
protected int tryAcquireShared(int acquires) {
  for (;;) {
    // If the synchronization queue has nodes waiting, failure -1 is returned
    if (hasQueuedPredecessors())
      return -1;
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||
        compareAndSetState(available, remaining))
      // If remaining >=0, the CAS is set. Otherwise, the VALUE is returned to Remaining
      returnremaining; }}//tryReleaseShared if the CAS state is set successfully
protected final boolean tryReleaseShared(int releases) {
  for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
      throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
      return true; }}Copy the code

CountDownLatch

Shared mode implementation, used in one or more threads to wait for other threads to complete

CountDown () releases state by overwriting tryReleaseShared, and blocks with the tryAcquireShared ability to wake up await() when countDown() reaches 0

/ / extension AQS
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount(a) {
            return getState();
        }
				//1. Await () method, and enter doAcquireShared if state is not 0 or -1
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0)?1 : -1;
        }
				The countDown() method reduces the value of state and, when state=0, releases the lock and wakes up the waiting thread
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0; }}}Copy the code

CyclicBarrier

This tool translates to loop fence and is an enhancement of CountDownLatch, which is typically used to wait for multiple threads to complete and then complete some logic. And can reset counting automatically. The bottom layer is done via ReentrantLock + Condition, and the core implementation is await().

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //1
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();
					
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
							//2. If the count is 0, the Runnable passed in is executed and the count is reset
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if(command ! =null)
                        command.run();
                    ranAction = true;
                  	//2.1 resets the count and signalAll() wakes up all waiting threads
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
							//3. If the count is not 0, Condition waits
            for (;;) {
                try {
                    if(! timed) trip.await();else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else{ Thread.currentThread().interrupt(); }}// Handle related exceptions and timeouts
                if (g.broken)
                    throw new BrokenBarrierException();

                if(g ! = generation)return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw newTimeoutException(); }}}finally{ lock.unlock(); }}Copy the code

ReentrantReadWriterLock

Read/write lock, where state is the high 16 bit read lock count and the low 16 bit write lock count. The write lock is in exclusive mode, and the read lock is in shared mode

Read without lock, read and write lock, write lock, the specific implementation is similar to ReentrantLock, but different read and write lock state