What is the AQS

The so-called AQS, refers to AbstractQueuedSynchronizer, English: abstract queue type synchronizer. It provides a framework for implementing blocking locks and a series of synchronizers that rely on FIFO wait queues. ReentrantLock, Semaphore, CountDownLatch, CyclicBarrier and other concurrent classes are implemented based on AQS. This is done by implementing its template methods by inheriting AQS, and then using the subclasses as inner classes of the synchronized component. It is reasonable to compare it to the sweeping monk, to compare thread related technology to some martial arts secrets, then AQS is the author, master the soul of the secrets.

AQS schematic

 

It maintains a volatile int state (representing shared resources) and a FIFO thread wait queue (which is entered when multithreaded contention for resources is blocked). Volatile is the key keyword here. The semantics of volatile are not covered here. There are three ways to access state:

  • getState()
  • setState()
  • compareAndSetState()

AQS defines two resource sharing modes: Exclusive (which can be executed by only one thread, such as ReentrantLock) and Share (which can be executed by multiple threads at the same time, such as Semaphore/CountDownLatch).

Different custom synchronizers compete for shared resources in different ways. The implementation of custom synchronizer only needs to realize the acquisition and release of shared resource state. As for the maintenance of specific thread waiting queue (such as failure to acquire resources in queue/wake up queue, etc.), AQS has been implemented at the top level. The implementation of a custom synchronizer is mainly implemented in the following ways:

  • IsHeldExclusively () : Whether the thread is monopolizing resources. You only need to implement it if you use condition.
  • TryAcquire (int) : Exclusive mode. Attempts to obtain the resource return true on success and false on failure.
  • TryRelease (int) : Exclusive mode. Attempts to free resources return true on success and false on failure.
  • TryAcquireShared (int) : Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
  • TryReleaseShared (int) : share mode. Attempts to free the resource, returning true if subsequent wait nodes are allowed to wake up after release, false otherwise.

In the case of ReentrantLock, state is initialized to 0, indicating that the state is not locked. When thread A locks (), tryAcquire() is called to monopolize the lock and state+1. After that, another thread will fail to tryAcquire() until the unlock() of thread A reaches state=0. Of course, thread A can repeatedly acquire the lock itself before releasing it (state will accumulate), which is the concept of reentrant. But be careful how many times you get it and how many times you release it, so that state can go back to zero.

In the CountDownLatch example, the task is divided into N child threads to execute, and state is initialized to N (note that N must be consistent with the number of threads). The N child threads are executed in parallel, countDown() once for each child thread, and state is reduced by 1. After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.

In general, custom synchronizers are either exclusive or shared methods, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. However, AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock.

Simulate a CountDownLatch with AQS itself

package com.zhang.myjuc.a8.aqs; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * MyCountDownLatch: Implement a simple thread collaborator on your own with AQS, equivalent to a one-time CountDownLatch ** @author zhangxiaoxiang * @date 2020/08/26 */ public  class MyCountDownLatch { private final Sync sync = new Sync(); /** * Publish in shared mode. If tryReleaseShared returns true, this is done by unblocking one or more threads. */ public void signal() { sync.releaseShared(0); } /** * in shared mode, ignore interrupt. First call tryAcquiremRed at least once and return if successful. Otherwise, the thread will queue and may repeatedly block and unblock, calling tryAcquiremred until it succeeds. */ public void await() { sync.acquireShared(0); } / / private inner class * * * * class Sync extends AbstractQueuedSynchronizer {/ * * * try in Shared mode. This method should query whether the state of the object allows it to be retrieved in shared mode, and if so, * * @param arg * @override protected int tryAcquireShared(int arg) {return (getState() == 1)? 1:1; } /** * Attempts to set the state to reflect publishing in shared mode. This method is always called by the thread performing the release. * * @param arg * @return */ @Override protected boolean tryReleaseShared(int arg) { setState(1); return true; Public static void main(String[] args) throws InterruptedException {MyCountDownLatch MyCountDownLatch = new MyCountDownLatch(); for (int i = 0; i < 10; I++) {new Thread(() -> {system.out.println (thread.currentthread ().getname () + "latch "); myCountDownLatch.await(); System.out.println(" open "+ thread.currentThread ().getName() +" continue "); }).start(); } Thread.sleep(5000); myCountDownLatch.signal(); New Thread(() -> {system.out.println (thread.currentThread ().getName() + "latch "); myCountDownLatch.await(); System.out.println(" open "+ thread.currentThread ().getName() +" continue "); }).start(); }}Copy the code

The results