This is the seventh day of my participation in the August More text Challenge. For details, see: August More Text Challenge
One, foreword
AbstractQueuedSynchronizer
Referred to as”AQS
, abstract queue synchronizer
It is the basic tool for JUC package synchronization.
AQS is an abstract class that implements synchronous queues with built-in spin-locks, encapsulates enqueued and dequeued operations, and provides methods for exclusive, shared, interrupt, and other features.
inAQS
In, one is definedvolatile int state
Variables as shared resources:
- If the thread fails to acquire the resource, synchronization is entered
FIFO
Waiting in a queue - The critical section code executes if the resource is successfully acquired
- When the resource is released, the waiting thread in the synchronization queue is notified to get the resource, dequeue and execute
In brief: AQS uses CAS to maintain state and LockSupport to operate on threads.
usingAQS
Class main steps:
- Step 1: Create your own thread collaboration utility class and write one internally
Sync
Class, theSync
Class inheritanceAbstractQueuedSynchronizer
, i.e.,AQS
; - Step 2: Think about the collaboration logic of the designed thread collaboration tool class, in
Sync
Class, overrides the corresponding method based on whether it is exclusive or not. If it is exclusive, rewritetryAcquire
和tryRelease
Methods; Rewrite if it is non-exclusivetryAcquireShared
和tryReleaseShared
Methods; - Step 3: Implement fetch/release methods in your own thread collaboration tool class and call them from there
AQS
The corresponding method is called if it is exclusiveacquire
或release
Etc method, non-exclusive is calledacquireShared
或releaseShared
或acquireSharedInterruptibly
Methods.
AQS
Realize the principle of
The two most important things in AQS:
state
: indicates the status. 0 indicates unlocked, and 1 indicates locked- Queue: The location where suspended threads are stored
AQS
Initially, as shown below:
- Thread 1 and thread 2 execute concurrently and lock, as shown in the figure:
- The execution of thread 1 is complete, as shown below:
- Thread 3 joins, fair lock and non-fair lock:
Second, the problem of
(1) WhyAQS
- Relative to
synchronized
Based on theJVM
.AQS
It’s lighter and more manageable AQS
The underlying rely onCAS
Sometimes this is relatively fast and performs better than suspending threads (context switching)
(2) How to lock
In fact, look at the above principle, you will understand that there are two main steps to lock:
-
Update state = 1
-
Set the AQS thread to the current thread
Any other thread will be queued and suspended
(3) How to release a lock
Releasing a lock is, of course, the reverse of locking:
- update
state
- update
AQS
The thread fornull
- The thread that wakes up the head of the queue
How do I wake up the team leader element?
Use the locksupport.unpark (thread) method.
How does the team leader element reattempt the lock after waking up?
- Recover from the blocked state first
- And then keep walking
for(;;)
Loop, judge from the beginning
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); Return thread.interrupted (); // 2. After locksupport.unpark (thread), return thread.interrupted (); }Copy the code
(4)state
Can it be greater than 1?
You can.
The value of state increases when the lock is reentered.
Of course, when it comes to releasing, it also needs to be reduced.
Iii. Design idea
Main actions:
CAS
maintenancestate
LockSupport
Operation thread
At the same time, the template pattern is adopted
Subclasses can customize their own logic to operate on state by implementing protected methods.
Here are some common methods that require subclasses:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
Four, source code analysis
There are four main aspects: Node, exclusive lock, shared lock, LockSupport,
The difference between a shared lock and an exclusive lock is whether it is owned by multiple threads
(0) Node
Queue node
Each Node holds one thread
Static final class Node{/* Static final class Node{/* Wait state of the current Node object. Note that this state describes not the current object but the next Node, * to determine whether to wake the next Node. * A. CANCELLED = 1: Nodes are set to the cancelled state due to timeout or interruption. Nodes in the cancelled state should not compete for locks. * Only the cancelled state should remain unchanged and cannot be converted to another state. Nodes in this state are kicked out of the queue and collected by GC. * b. SIGNAL = -1: indicates that the node's successor is blocked and needs to be notified; * c. CONDITION = -2: indicates that the node is in the CONDITION queue and blocked because it is waiting for a CONDITION; * d. PROPAGATE = -3: Used in shared mode if the head node is in this state, it means that the next acquisition of the lock can PROPAGATE unconditionally; * e.0: None of the above, the new node will be in this state. * * Non-negative values indicate that the node does not need to be notified (awakened). */ volatile int waitStatus; // Null volatile Node prev if it is the header; // Volatile Node next; // The Thread object bound to Node is volatile Thread Thread; Condition is in exclusive mode, so there is a simple queue to describe the thread nodes on the Condition. Node nextWaiter; }Copy the code
(1) an exclusive lock
Process, as shown in the figure:
acquire()
Acquiring a lock
public final void acquire(int arg) {
// 1. Try to obtain the synchronization status and exit if it succeeds
// 2. If the failure occurs, the device is added to the end of the queue
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }private Node addWaiter(Node mode) {
// 1. Build the current thread as Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// 2. If the end node of the current synchronization queue is NULL, it indicates that the current thread is the first thread to join the synchronization queue and wait
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//1. Construct the header node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2. Tail insert, CAS operation failed spin attempt
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// The process of spinning
for (;;) {
// 1. Obtain the pioneer node of the current node
final Node p = node.predecessor();
// 2. Can the current node obtain an exclusive lock
// 2.1 An exclusive lock can be obtained if the current node's precursor node is a head node and the synchronization status is successfully obtained
if (p == head && tryAcquire(arg)) {
// The queue header pointer should point to the current node
setHead(node);
// Release the precursor node
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2.2 Failed to acquire the lock and the thread entered the wait state waiting to acquire the exclusive lock
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}// Suspend after obtaining failed
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt(a) {
// Causes the thread to block
LockSupport.park(this);
return Thread.interrupted(); // Returns whether to block
}
Copy the code
release()
Release the lock
public final boolean release(int arg) {
// tryRelease() is the method each implementation class needs to implement
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
// Each lock release wakes up the threads referenced by the node's successors in the queue, further confirming that the lock acquisition process is a FIFO (first in first out) process.
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// The successor node of the first node
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
// Wake up the thread when the successor node is not NULL
LockSupport.unpark(s.thread);
}
Copy the code
acquireInterruptibly
Interruptible lock acquisition
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if(! tryAcquire(arg))// The thread failed to acquire the lock
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// Insert the node into the synchronization queue
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// Get the lock out of the team
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// Thread interrupt throws an exception
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
tryAcquireNanos()
Timeout wait lock acquisition
That is, the lock.trylock (timeout, TimeUnit) method is called
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
// Implement the effect of timeout wait
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1. Calculate the cut-off time based on the timeout period and the current time
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//2. The current thread obtains the lock out of the queue
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 3.1 Recalculating the timeout period
nanosTimeout = deadline - System.nanoTime();
False is returned after timeout
if (nanosTimeout <= 0L)
return false;
// 3.3 The thread blocks and waits
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 3.4 Thread being interrupted throws an interrupted exception
if (Thread.interrupted())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
(2) the Shared lock
The process is shown as follows:
Instead of an exclusive lock, only shared is added to the method.
acquireShared()
Obtaining a shared lock
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// When the precursor node of the node is the head node and the synchronization status is obtained successfully
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
releaseShared()
Releasing a shared lock
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}Copy the code
(3) LockSupport
// Look at the upparksucceeded () method
private void unparkSuccessor(Node node) {
// Omit irrelevant code
LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
// Omit irrelevant code
if(thread ! =null)
// delegate to the UNSAFE#unpark method, which is a local method
UNSAFE.unpark(thread);
}
Copy the code