I. Introduction to AQS
Doug Lea when writing the JUC introduced Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer, of the abstract synchronizer is based on the queue, we generally called AQS.
What is a AQS?
Many implementations of locks and synchronization components (ReentrantLock, CountDownLatch, CyclicBarrier, FutureTask, etc.) in JUC are based on AQS.
Why do YOU need AQS?
A lot of locks, synchronizers, they do a lot of the same things. If you can extract a utility class, you can use it directly. For things like ReentrantLock and CountDownLatch, you can block out a lot of details. Just focus on their own business logic.
Synchronizer fundamentals
// acquire does the following:
while(Failed to obtain synchronization status) {if(The current thread is not queued) {the current thread is queued; } attempts to block the current thread; } The current thread is removed from the wait queue// release:Updating synchronization Statusif(the synchronization state is sufficient to allow a blocking thread to acquire) {unblock one or more waiting queues; }Copy the code
In order to do this, the following three basic steps need to work together:
- Atomic management of synchronization state (use
CAS
Atom modified shared flag bit) - Wait queue management
- Thread blocking and unblocking (call
LockSupport
The underlying layer is based onUnsafe
Of the classpark()
&unpark()
Methods)
Second, source code analysis
AQS Key attributes
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// the header encapsulates the thread currently holding the lock
private transient volatile Node head;
// Each new node comes in and is inserted at the end to form a linked list
private transient volatile Node tail;
// Indicates the status of the current lock. 0 indicates that the lock is not occupied. A value greater than 0 indicates that the current lock is held by a thread
private volatile int state;
// represents the thread currently holding the exclusive lock
/ / AbstractQueuedSynchronizer inherited from AbstractOwnableSynchronizer an attribute
private transient Thread exclusiveOwnerThread;
Copy the code
Node Key Attributes
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
static final class Node {
// indicates that the node is currently in shared mode (threads are waiting for locks in shared mode)
static final Node SHARED = new Node();
// indicates that the node is currently in exclusive mode (threads are waiting for locks in exclusive mode)
static final Node EXCLUSIVE = null;
// This thread cancels the lock contention
static final int CANCELLED = 1;
// indicates that the thread is ready, waiting for the resource to be released
static final int SIGNAL = -1;
// indicates that the node is in a waiting queue, and the node thread is waiting to wake up.
static final int CONDITION = -2;
The next acquireShared field should be propagated unconditionally (this field is only used if the current thread is SHARED)
static final int PROPAGATE = -3;
// The value is 1, -1, -2, -3, or 0 (the default value for initialization).
// If waitStatus > 0, the thread cancels the wait
volatile int waitStatus;
// A reference to the precursor node
volatile Node prev;
// A reference to a successor node
volatile Node next;
// The package is in the current node thread
volatile Thread thread;
// A reference to the next node waiting for the condition (not discussed yet)
Node nextWaiter;
}
Copy the code
AQS Key methods
AQS sets methods for subclasses to implement according to their needs
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// Get the lock (change the flag bit), you can wait until the lock is acquired
// final modifier. Subclasses can only be called but cannot be modified
// Use tryAcquire() to try to acquire the lock
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }/ / releases the lock
// final modifier. Subclasses can only be called but cannot be modified
// Call the tryRelease() of the call class, release the lock, return true for full release, false otherwise
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
// Try to get the lock (modify flag bit), return immediately
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// Try to release the lock
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// Create and queue nodes for the current thread and the given pattern
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/ / team
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}// Wake up the successor node
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// Find the first node in the normal blocking state
// Some nodes are omitted to avoid high concurrency
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
/ / wake
LockSupport.unpark(s.thread);
}
// Interrupt the current thread
static void selfInterrupt(a) {
Thread.currentThread().interrupt();
}
Copy the code
Acquire: while (! tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; }
Release: if (tryRelease(arg)) unblock the first queued thread;
See AQS principle from ReentrantLock
Sync, the internal class of ReentrantLock, has both fair and unfair lock synchronization modes.
// java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
/ / for the lock
abstract void lock(a);
// The lock release process does not distinguish between fair and unfair locks, so it is placed in the parent class Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
If state is 0, no thread has acquired the lock. Set the current thread holding the exclusive lock to null
free = true;
setExclusiveOwnerThread(null);
}
If state is not 0, the thread has not fully released the lock. Set the remaining unreleased times to state
setState(c);
return free;
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; }}Copy the code
// java.util.concurrent.locks.ReentrantLock.NonfairSync
// Implementation of an unfair lock
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
// Since this is an unfair lock, it takes the implementation of an unfair lock
return nonfairTryAcquire(acquires);
}
/ / for the lock
final void lock(a) {
// Use CAS to set the State variable
if (compareAndSetState(0.1))
// if the CAS is successful, the current thread is set to the exclusive thread
setExclusiveOwnerThread(Thread.currentThread());
else
// if CAS fails, acquire()
// Set tryAcquire() to tryAcquire()
AcquireQueued () ¶
acquire(1); }}Copy the code
// java.util.concurrent.locks.ReentrantLock.FairSync
// Fair lock implementation
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/ / lock
final void lock(a) {
Set tryAcquire() to tryAcquire()
AcquireQueued () ¶
acquire(1); }}Copy the code
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// Get the lock in spin mode
Return true if interrupted while waiting, false otherwise
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
TryAcquire (); tryAcquire(); tryAcquire()
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}// Check and update the status of failed nodes
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// If the waiting status of the previous node is SIGNAL, the status of the previous node is normal. True is returned
return true;
if (ws > 0) {
// If the wait state of the previous node is greater than 0, the wait status of the previous node is cancelled. Continue to search until the previous node is good
// Threads queuing into the blocking queue are suspended, and the wake up operation is done by the precursor node
// We need to find a good precursor node, because we still need it to wake up
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// If the waitStatus of the precursor node is not equal to -1 or 1, it can only be 0, -2, -3
// When each new node joins the queue, waitStatus is not set, so it is initialized to 0
// If the tail node is the tail node, its waitStatus is 0
// Use CAS to set waitStatus of the precursor node to SIGNAL.
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
/ / this returns false, the next time death cycle in shouldParkAfterFailedAcquire () will return true
return false;
}
// Let the current thread park and check if it is interrupted
private final boolean parkAndCheckInterrupt(a) {
// Block the thread, and the underlying call to UNSAFE's API
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
Third, summary
This time, starting from the source code, a simple introduction of AQS, mainly from acquire() & Release () as the entrance, to ReentrantLock lock, queuing, waiting for suspension, wake up, release to analyze the basic principles of AQS.