preface
AQS (AbstractQueuedSynchronizer) provides a Java synchronizer JDK1.5. Util. Concurrent package (JUC) greatly improved the concurrent performance, while AQS is the core of JUC. It is the basic framework for building locks (such as ReentrantLock) and other synchronization tools (such as CountDownLatch). AQS itself is an abstract class that defines the code structure for acquiring and releasing locks, so if you want to create a lock, just inherit AQS and implement the corresponding methods.
The composition of AQS
AbstractOwnableSynchronizer AbstractQueuedSynchronizer inheritance, is to know what is the current thread lock.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
static final class Node{... }// Synchronize queue head
private transient volatile Node head;
// Synchronize queue tail
private transient volatile Node tail;
// The status value determines whether the lock can be obtained according to the status
private volatile int state;
// Conditional queue
public class ConditionObject implements Condition.java.io.Serializable {... }Copy the code
State property
AbstractQueuedSynchronizer defines a volatile member variables of type int state to represent the synchronization state, changes to the state value by CAS. For example, in CountDownLatch, state is the reciprocal number passed in the constructor, and in Semaphore, state is the number of licenses remaining. In ReentrantLock, it indicates the lock reentrant count.
Synchronous queue
FIFO Node
static final class Node {
Threads wait for locks in shared mode
static final Node SHARED = new Node();
// Threads wait for locks in exclusive mode
static final Node EXCLUSIVE = null;
// Indicates that the thread's request to acquire the lock has been cancelled
static final int CANCELLED = 1;
// indicates that the thread is ready, waiting for the resource to be released
static final int SIGNAL = -1;
// The thread of the node is waiting to wake up when it is moving from the synchronous queue to the conditional queue
static final int CONDITION = -2;
// The current thread in the SHARED state is in the runnable state
static final int PROPAGATE = -3;
// Indicates the status of the current node. The behavior of the node can be controlled by the status of the node
// The normal synchronization node is 0, and the CONDITION node is condition-2
volatile int waitStatus;
// The former node of the current node
volatile Node prev;
// The next node of the current node
volatile Node next;
// Thread of the current node
volatile Thread thread;
Node nextWaiter;
Copy the code
AbstractQueuedSynchronizer defines two types of resource sharing, SHARED (share) and EXCLUSIVE (EXCLUSIVE). The synchronous queue is a two-way queue, when multiple threads are requesting a lock, in exclusive lock mode. If only one thread can obtain the lock at a certain time, the remaining threads that cannot obtain the lock will block and queue up in the synchronization queue. AQS maintains a CLH queue internally to manage the lock. If the thread tries to obtain the lock, it will package the current thread and waiting state into a Node Node. Joins the end of the Sync Queue, blocks the current thread, and wakes up the head of the Queue when synchronization is released.
How to obtain a lock
acquire/acquireShared
AbstractQueuedSynchronizer acquire is realized by using template method pattern () to get locked resources.
- Acquire () the implementation of the exclusive lock
public final void acquire(int arg) {
if(! tryAcquire(arg) &&// Try a tryAcquire attempt, return if successful, and acquireQueued if unsuccessful
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Copy the code
- AcquireShared Shared lock ()
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Copy the code
- TryAcquire (), tryAcquireShared(),AQS does not implement this method, let subclasses implement their own
The following has been exclusive mode analysis source code example
- If tryAcquire() fails, addWaiter is called to add the thread to the sync queue, and the current thread is placed at the end of the sync queue;
private Node addWaiter(Node mode) {
// Wrap the current thread as Node, mode denotes Node mode (exclusive/shared)
Node node = new Node(Thread.currentThread(), mode);
// CAS puts the node at the end of the queue
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Copy the code
- After enqueueing, invoke the acquireQueued method: Block the current node so that it can acquire the lock when it is awakened;
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// Select the last node of the current node
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// Get lock, set to head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// Set the state of the previous node to SIGNAL
if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt blocks the current thread
parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed)
// If the node lock fails to be obtained, the node is removed from the queuecancelAcquire(node); }}// pred is the previous node, node is the current node. Set the previous node state to SIGNAL.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// If the waitStatus status of the previous node is already SIGNAL, return it directly
if (ws == Node.SIGNAL)
return true;
// If the current node state has been cancelled.
if (ws > 0) {
// Find the node whose previous state was not cancelled, because the current node is attached to the active node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// Otherwise, set the node status to SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
How to release locks
Exclusive lock mode, starting from the head of the queue, looking for its next node, if the next node is empty, starting from the tail, to find the node is not canceled, and then release the node.
// The base method of unlock
public final boolean release(int arg) {
// tryRelease is handed over to the implementation class, which usually subtracts the arG from the current synchronizer state. If true is returned, the lock is released successfully.
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
// Wake up the node waiting for the lock from the beginning
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// waitStatus of the header
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// If the next node is null or cancelled, the non-cancelled node at the beginning of the queue is found
if (s == null || s.waitStatus > 0) {
s = null;
// find the first node in the queue whose waitStatus<0.
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
// If the next node of the current node is not empty and the state <=0, wake up the current node
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
Implementation of AQS in CountDownLatch
CountDownLatch is a flexible locking implementation that allows one or more threads to wait for a set of events to occur. A lock consists of a counter that is initialized with a positive number indicating the number of events to wait, the countDown method decrement the counter to indicate that an event has occurred, and the await method waits for the counter to reach zero to indicate that all events that need to wait have occurred. If it is non-zero, then the await method blocks until the counter is zero or until the thread interrupts or times out.
Source code analysis
As you can see,CountDownLatch implements AQS ‘tryAcquireShared and tryReleaseShared methods to obtain and release resources in a shared manner.
- The constructor
The count passed in when creating the CountDownLatch is the state value of the AQS
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Copy the code
- Await () CountDownLatch await method, it is call of AQS doAcquireSharedInterruptibly acquiring a lock
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
- CountDown () call releaseShared (1);
CountDownLatch divides the task into count threads and execution, initializes state equal to count, and the threads execute in parallel. TryReleaseShared is called once after countDown() and the state is subtracted by 1 through the CAS algorithm. After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.