Introduction to the
AQS is based on CAS, which realizes the synchronization, blocking and wake up mechanism of FIFO waiting queue.
Inheritance system
- AbstractQueuedSynchronizer inheritance in AbstractOwnableSynchronizer, thread is used to set and get access
- AbstractQueuedSynchronizer waiting queue model of CLH (Craig, Landin and Hagersten) variants of the queue, for double linked list
- Its inner class Node is the Node of CLH
- ConditionObject is the implementation class of Condition
What is a CLH queue?
CLH: Craig, Landin and Hagersten queues are unidirectional linked lists. Queues in AQS are virtual bidirectional queues (FIFO) of CLH variants. AQS is to achieve lock allocation by encapsulating each thread requesting shared resources into a node.
AQS properties
attribute | instructions |
---|---|
head | Wait queue head node |
tail | Wait for the end node of the queue |
state | sync |
AbstractQueuedSynchronizer# Node attribute
attribute | instructions |
---|---|
waitStatus | Status of the current node in the queue |
prev | Precursor pointer |
next | Pointer to the subsequent |
thread | Represents the thread at the node |
nextWaiter | Points to the next node in CONDITION |
AQS lock mode
model | instructions |
---|---|
SHARED | In shared mode, the lock can be held by multiple threads. For example, in read/write lock mode, multiple threads are allowed to hold a lock |
EXCLUSIVE | In exclusive mode, the lock can only be held by one thread |
Meaning of the status value of waitStatus
Corresponding field definition | value | instructions |
---|---|---|
CANCELLED | 1 | The thread’s request to acquire the lock was cancelled due to timeout or interrupt |
– | 0 | Thread Node initialization value |
SIGNAL | – 1 | Threads on subsequent nodes are suspended to enter the wait state |
CONDITION | 2 – | The node is in a wait queue, waiting to wake up |
PROPAGATE | – 3 | The thread is used only when it is in shared mode |
AQS#CLH join the team
public class AbstractQueuedSynchronizer {
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // Create a new node for the current thread
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // Assign the tail node to the new precursor node
if(pred ! =null) { // The precursor node is not null
node.prev = pred; // Point the precursor node of the current thread node to the old tail node
if (compareAndSetTail(pred, node)) { // Set the current thread node to the tail node
pred.next = node; // Point the successor of the old tail node to the current thread node
return node; // Returns the current node
}
}
enq(node); // The old tail node is null, indicating that the wait queue is null
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // Set the header node
tail = head; // Set the tail node
} else {
node.prev = t; // Point the precursor node of the current thread node to the old tail node
if (compareAndSetTail(t, node)) { // Set the tail node to the current thread node
t.next = node; // Point the successor of the old tail node to the current thread
return t; / / return
}
}
}
}
}
Copy the code
As can be seen from the above code, the pre and thread attributes of the HEAD node of AQS are null, and its successor nodes point to the node of the first suspended thread of the wait queue.
AQS team entry diagram
Acquiring a lock
An exclusive lock
Acquire lock -acquire(int arg)
- Gets in exclusive mode, ignores interrupts (even if the thread is interrupted, the business logic of the reserve is still executed), and returns successfully, otherwise the thread is suspended and queued.
- Source code analysis
public class AbstractQueuedSynchronizer {
public final void acquire(int arg) {
if(! tryAcquire(arg) &&// No lock was obtained
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Add the current thread to the queue. // Add the current thread to the queue
selfInterrupt(); // Interrupt the thread}}Copy the code
AbstractQueuedSynchronizer# first acquire through tryAcquire (int arg) method to get lock (the logic of this method is determined by specific implementation class), success is returned directly, if failure criterion Two-step, The thread is added to the wait queue via the addWaiter(Node mode) method and acquireQueued(Final Node Node, int arg) method to acquire the lock with uninterrupted spin
public class AbstractQueuedSynchronizer {
final boolean acquireQueued(final Node node, int arg) { // Current thread Node, agr=1
boolean failed = true; // Get lock failure flag
try {
boolean interrupted = false; // Interrupt flag
for (;;) { / / polling
final Node p = node.predecessor(); // Get the last node
if (p == head && tryAcquire(arg)) { // If it is a header node, try to obtain the lock
setHead(node); // Set the header node
p.next = null; // help GC
failed = false; // Lock failure flag is set to false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // Non-header node
parkAndCheckInterrupt()) // Suspends the current thread and interrupts validation
interrupted = true; // Set the interrupt flag to true}}finally {
if (failed) // Failed to obtain the lock
cancelAcquire(node); // Cancel the lock acquisition}}}Copy the code
AbstractQueuedSynchronizer# acquireQueued as uninterrupted access patterns has thread in the queue, when the current thread when the precursor node as the head node, acquiring a lock and returns, when not the head node, Through shouldParkAfterFailedAcquire method to determine whether a thread to hang, if necessary, by parkAndCheckInterrupt detection method hung thread interrupt, otherwise enter the next cycle, until the thread hangs, Finally, wait for the thread of the precursor node to release the lock, enter the last loop and return with the lock. 3. Lock obtaining process
Acquires the lock -acquireInterruptibly(int arg)
- Gets in exclusive mode and terminates if the thread is interrupted
- Source code analysis
public class AbstractQueuedSynchronizer {
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // Interrupt the verification
throw new InterruptedException();
if(! tryAcquire(arg))/ / acquiring a lock
doAcquireInterruptibly(arg); // Fetch in exclusive interruptible mode}}Copy the code
AbstractQueuedSynchronizer# acquireInterruptibly first do is break check, interrupt directly throw an exception, erasing interrupt tags at the same time, do not enter the lock acquisition process; Use the tryAcquire method to attempt to acquire the lock, or doAcquireInterruptibly to obtain the lock continuously if the tryAcquire method fails
public class AbstractQueuedSynchronizer {
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); // Add to wait queue
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // Get the previous gpa
if (p == head && tryAcquire(arg)) { // Get the lock again
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && // Verify whether to suspend
parkAndCheckInterrupt()) // Thread suspends and validates
throw new InterruptedException(); // Throw an interrupt exception}}finally {
if (failed) // Failed to acquire lock - thread interrupted
cancelAcquire(node); // Cancel the lock acquisition attempt}}}Copy the code
Add to the wait queue by addWaiter, then enter the polling, try to acquire the lock if the precursor node is the head node, otherwise suspend the thread. If the thread is interrupted during the suspend phase, InterruptedException is thrown and the logic of the cancelAcquire method is processed in the finally block
public class AbstractQueuedSynchronizer {
private void cancelAcquire(Node node) {
if (node == null) // node is null and is ignored
return;
node.thread = null;
Node pred = node.prev; // Get the precursor node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev; // Skip cancelling to the precursor node
Node predNext = pred.next;// The successor node of the precursor node is not cancelled
node.waitStatus = Node.CANCELLED; // Set the current node to cancel
if (node == tail && compareAndSetTail(node, pred)) { // If the current node is the tail node, then the skip cancelated precursor point is set to the tail node
compareAndSetNext(pred, predNext, null); // Set the successor node of the tail node to NULL
} else {
int ws;
if(pred ! = head &&// Skip cancelling the precursor point is not the head node
((ws = pred.waitStatus) == Node.SIGNAL || // Skip cancelled precursor has a waitStatus of -1
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // Skip the cancelled precursor's waitStatus<=0 and set it to -1pred.thread ! =null) { // Thread is not null for the precursor node that skips cancellations
Node next = node.next; // Get the successor node of the current node
if(next ! =null && next.waitStatus <= 0) // The successor is not null& waitStatus of the successor <=0
compareAndSetNext(pred, predNext, next); // The successor of the cancelled precursor node will be skipped to point to the successor of the current node
} else {
unparkSuccessor(node); // Wake up the node's successor
}
node.next = node; // help GC}}}Copy the code
AbstractQueuedSynchronizer# cancelAcquire slightly longer, but is divided into three parts: but, first of all is to filter out precursor thread thread has been canceled, in the process of the current thread pointing in the direction of the filtered precursor precursor node, if there is a waiting thread 1, 2, 3, 4, 5, Thread 4 is interrupted in the process of acquiring the lock, and threads 2 and 3 are found to be cancelled during the filtering, so the precursor node of thread 4 is pointed to thread 1. Then determine whether the current thread node is the tail node, if so, set the tail node to the front node, and set the successor node of the new tail node to NULL. If it is not a tail node, then it satisfies:
The front node is not the head node; When waitStatus=-1 or waitStatus<=0, set waitStatus=-1 to see if it succeeds. Thread is not null
, when the successor node of the current node is not NULL and waitStatus of the successor node is <=0, the filtered precursor node points to the successor node of the current node. If one of the above three conditions is not met, the successor node of the current node is woken up.
- Lock acquisition process
Obtain lock -tryAcquireNanos(int arg, long nanosTimeout)
- Lock acquisition in exclusive mode for a given period of time, timeout failed, if interrupted, lock acquisition terminated
- Source code analysis
public class AbstractQueuedSynchronizer {
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted()) // Determine whether the thread is interrupted
throw new InterruptedException();
return tryAcquire(arg) || // Try to acquire the lock first, return true on success, false on failuredoAcquireNanos(arg, nanosTimeout); }}Copy the code
First, determine whether the thread is interrupted, interrupt will throw an exception, at the same time erase interrupt mark; Next, try tryAcquire, return true on success, false on failure; Finally, the doAcquireNanos method is called to obtain the lock within the set time
public class AbstractQueuedSynchronizer {
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) Return false if the wait time is less than or equal to 0
return false;
final long deadline = System.nanoTime() + nanosTimeout; // Current system time + wait time
final Node node = addWaiter(Node.EXCLUSIVE); // Add to wait queue
boolean failed = true; // Failure flag
try {
for (;;) {
final Node p = node.predecessor(); // The precursor node
if (p == head && tryAcquire(arg)) { // The precursor node is head. Try again to acquire the lock
setHead(node); // Set the current node to the head node, also waiting for the queue to queue out
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); // Waiting time is left
if (nanosTimeout <= 0L) // Less than 0 returns false
return false;
if (shouldParkAfterFailedAcquire(p, node) && // Determine whether the thread is suspended
nanosTimeout > spinForTimeoutThreshold) // The waiting time is greater than the spin time
LockSupport.parkNanos(this, nanosTimeout); // The lock is acquired during the wait time
if (Thread.interrupted()) // Determine whether to interrupt
throw newInterruptedException(); }}finally {
if (failed)
cancelAcquire(node); // Interrupt processing}}}Copy the code
Gets the timeout before going into spin, constructs the current thread node, and adds it to the wait queue, and then goes into spin. If precursor node as the head, try again to get the lock, lock are successful and get out of the current node team, failure to compute waiting time, secondly determine whether need to hang up and wait for remaining thread is greater than the spin time, will thread hangs, otherwise will return false when the next spin, cancel the current thread for locks at the same time. When a thread is suspended, if it is awakened due to thread interruption, an exception is thrown and the current thread is freed from the lock.
- Lock acquisition process
Releases the lock – release
- Wake up successor nodes in exclusive mode
- Source code analysis
public class AbstractQueuedSynchronizer {
public final boolean release(int arg) {
if (tryRelease(arg)) { // This method has a subclass implementation that determines whether it can wake up subsequent nodes
Node h = head; // Head node
if(h ! =null&& h.waitStatus ! =0) // The head node is not null and waitStatus is not initialized
unparkSuccessor(h); // Wake up the node's successor
return true;
}
return false; }}Copy the code
The tryRelease method is used to determine whether the successor node is wakeable. If it is not wakeable, false is returned; otherwise, whether the head node is null and waitStatus is not 0 is determined, wake the successor node and return true
public class AbstractQueuedSynchronizer {
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // waitStatus
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // Set waitStatus to 0
Node s = node.next; // Subsequent nodes
// If the subsequent node is null, it is the tail node. If waitStatus>0, it is the node acquired by the cancelled lock
if (s == null || s.waitStatus > 0) { // The successor is null or waitStatus>0
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)// Get the tail node and traverse its precursor node
if (t.waitStatus <= 0) / / if waitStatus < = 0
s = t; // Assign the precursor node to t for the next traversal
}
if(s ! =null)
LockSupport.unpark(s.thread); // Wake up the successor node}}Copy the code
If waitStatus<0, set waitStatus to 0 and then obtain the successor node. If the successor node is null or its waitStatus>0, find the uncancelled node to be awakened and wake it up. 3. Wake up the process