The first two articles introduced some of the concepts of multithreading and thread pools, but in this article I finally introduce AQS, a very important basic component of JUC!
AQS profile
AQS is short for queue synchronizer AbstractQueuedSynchronizer, is used to construct the lock and other queue synchronization component (already, CountDownLatch, Semaphore, etc.) the basis of the framework. It uses a volatile member variable of type int to indicate synchronization status, and uses a static internal class Node queue (bidirectional linked list) to queue up threads to acquire synchronized resources. The template method pattern is used in AQS. Subclasses manage synchronization state by inheriting AQS and overwriting some of its methods. Subclasses are typically used as static inner classes for custom synchronization components.
AQS implementation analysis
sync
/** ** private volatile int state; /** * returns the current value of synchronization status */ protected final intgetState() {
returnstate; } /** * set the synchronization status */ protected final voidsetState(int newState) { state = newState; } /** * protected final Boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
The status field is made visible by volatile modifications, and the atomicity of the status modification operation is ensured by the CAS operation. Thus, it is thread-safe in AQS. (This does not mean that volatile variables are thread-safe and only atomically read/written to a single variable, so updates need to be atomically guaranteed by CAS.) The value of status can be used to control whether the synchronizer is in exclusive mode (less than or equal to 1) or shared mode (greater than 1).
Exclusive mode: Only one thread can obtain the synchronization status at a time. Shared mode: Multiple threads obtain synchronization status at the same time.
AQS provides some template methods to control the synchronization state, and the synchronization components need to use the template methods provided by AQS to achieve synchronization. The key methods to be rewritten are:
methods | instructions |
---|---|
tryAcquire(int arg) | Exclusive access to the synchronization status |
tryAcquireShared(int arg) | Obtaining synchronization status in shared mode |
tryRelease(int arg) | Exclusive release synchronization state |
tryReleaseShared(int arg) | Release synchronization state in shared mode |
Synchronous queue
AQS relies on the synchronization queue to complete the synchronization status management. If the current thread fails to obtain the synchronization status, it will be constructed as a Node Node and added to the synchronization queue (CAS ensures thread safety), and the current thread will be blocked. When the synchronization status is released, the thread in the first node of the queue is woken up to try to obtain the synchronization status again.
private transient volatile Node head; private transient volatile Node tail; /** CAS sets the head node. Only called by the enq() method. */ private final Boolean compareAndSetHead(Node update) {compareAndSetHead(Node update) {returnunsafe.compareAndSwapObject(this, headOffset, null, update); } /** CAS sets the tail node. */ private final Boolean compareAndSetTail(Node expect, Node update) {returnunsafe.compareAndSwapObject(this, tailOffset, expect, update); } static final class Node {/** indicates that the Node is waiting in SHARED mode. */ static final Node SHARED = new Node(); */ static final Node EXCLUSIVE = null; Static final int CANCELLED = 1; static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; Static final int condition = -1; static final int condition = -1; static final int condition = -1; static final int condition = -1; static final int condition = -1; static final int condition = -1; static final int condition = -1; / / PROPAGATE final int PROPAGATE = -3; /** Wait state */ volatile intwaitStatus; /** Precursors */ volatile Node prev; /** Next Node */ volatile Node next; /** The Thread whose status is synchronized */ volatile Thread Thread; /** Wait for the successor Node in the Condition, or SHARED constant */ Node nextWaiter; final booleanisShared() {
returnnextWaiter == SHARED; }}Copy the code
Process for obtaining synchronous state (exclusive mode)
Here is a simple analysis of the source code for obtaining synchronization state. A picture is worth a thousand words, so let me give you a flow chart to give you a general idea of the process.
Source code analysis
The acquire(int arg) method is used to obtain the synchronization status
/** * Get synchronization status in exclusive mode, ignoring interrupts. Success is returned by at least calling the tryAcquire implementation. * Otherwise the thread queues and may repeatedly block and unblock, calling tryAcquire until it succeeds. */ public final void acquire(int arg) {if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
The tryAcquire() method has different implementations for different synchronization components; for example, fair and unfair locks in ReentrantLock are implemented slightly differently. This article will not analyze in detail, the main function is to get synchronization status, return true on success, return false otherwise.
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try a fast path to the enq method. Failed backup to complete enq method Node pred = tail;if(pred ! = null) { node.prev = pred;if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if(t == null) {// If the queue is empty, initializeif (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
The addWaiter(Node.exclusive) method is used to construct a Node in EXCLUSIVE mode and attempt to add it to the wait queue as a tail Node using a CAS operation; If joining fails, enter the enq() method by setting CAS as the tail node in an infinite loop until it succeeds.
/** * In exclusive uninterrupted mode for threads already in the synchronized queue. Used and retrieved by conditional wait methods. */ final boolean acquireQueued(final Node node, int arg) { boolean failed =true;
try {
boolean interrupted = false;
for(;;) Final Node p = node.predecessor(); // Try to get synchronization status if the precursor node is the head nodeif (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
returninterrupted; } // block the current thread if the precursor node is not the head nodeif (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if(failed) cancelAcquire(node); }}Copy the code
In the acquireQueued() method, the current thread attempts to obtain synchronization state through an infinite loop, and only if the precursor node is the head node. The thread of the head node wakes up its successor nodes after releasing the synchronization state. This is to prevent premature notification (waiting threads wake up due to interruption) by checking whether the precursor is the head node before obtaining the synchronization state.
Release process for synchronous state (Exclusive mode)
public final boolean release(int arg) {
if(tryRelease(arg)) { Node h = head; // If the head node in the queue is not null and the node is not in the initial state (waitThe Status of 0)if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;
}
return false; } private void unparksucceeded (Node Node) {/* * if the state is negative try to clear the desired signal */ int ws = node.waitStatus;if(ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * If the successor node is null orwait*/ Node s = node.next; */ Node s = node.next;if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! = null && t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! = null) LockSupport.unpark(s.thread); }Copy the code
This method releases the synchronization state by calling tryRelease(ARG), depending on the component implementation, and then wakes up its successor so that it tries to get the synchronization state.
Introduction of Condition
The Condition interface defines two types of wait/notify methods. The precondition for calling these methods is to acquire the lock associated with the Condition Object (similar to the Synchronized lock required to call the wait and notify of Object). ConditionObject is an internal class in AQS that implements the Condition interface. ConditionObject has two Node member variables, firstWaiter and lastWaiter.
Waiting for the
public final void await() throws InterruptedException {
if(Thread.interrupted()) throw new InterruptedException(); // join queue Node Node = addConditionWaiter(); Int savedState = fullyRelease(node); int interruptMode = 0; // Enter the wait statewhile(! isOnSyncQueue(node)) { LockSupport.park(this);if((interruptMode = checkInterruptWhileWaiting(node)) ! = 0)break; } // Wake up and try again to enter the synchronization queue for the lockif(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! = null) // clean upifcancelled unlinkCancelledWaiters(); // Throw an exception if it is interruptedif(interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
Calling the await() method causes the current thread to join the wait queue and release the synchronization state, waking up successor nodes in the synchronization queue, and then the current thread enters the wait state. When the node in the waiting queue wakes up (signal()) and tries to get the synchronization status again, an exception is thrown if it wakes up because of an interrupt.
notice
public final void signal() {// Determine whether the current thread has acquired the lockif(! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // Wake up the first node in the wait queueif(first ! = null)doSignal(first);
}
Copy the code
To call this method, you must first get the synchronization state. The method moves the first node in the wait queue to the synchronization queue and wakes it up. The awakened node tries to get the synchronization state again
conclusion
Above, I briefly analyzed the important tool AQS in JUC. In fact, obtaining and releasing the synchronization state of AQS is equivalent to entering and exiting Synchronized synchronization. AQS wait queue is equivalent to Synchronized wait pool. Condition in AQS is equivalent to the object wait pool of Synchronized. The await() and signal() of Condition are equivalent to the wait() and notify() of Object.
Reference: The Art of Concurrent Programming in Java