Introduction to the
Before this introduction is already, in the ReentrantReadWriteLock sync attribute, and the sync is inherited AQS AbstractQueuedSynchronizer synchronizer. AQS adopts the template design pattern, calls its template methods (exclusive get and release synchronization state, shared get and release synchronization state and query waiting threads in the synchronization queue), overrides the specified method, we can use AQS to construct customized synchronization components.
AQS parsing
Important attributes
Private TRANSIENT volatile Node head; Private TRANSIENT volatile Node tail; private transient volatile Node tail; // Synchronization status private volatile int state;Copy the code
AQS internally defines a FIFO queue through head and tail. State indicates the synchronization state (0 indicates that no thread has acquired the synchronization state or lock, and greater than 0 indicates that there is thread possession). Both are volatile to ensure the visibility of memory
Important inner class
Static final class Node {/** SHARED mode */ static final Node SHARED = new Node(); /** EXCLUSIVE mode */ static final Node EXCLUSIVE = null; /** Because the thread waiting in the synchronization queue timed out or was interrupted, it needs to cancel the wait from the synchronization queue.The node does not change after entering this state*/ static final int CANCELLED = 1; Static final int SIGNAL = -1; static final int SIGNAL = -1; static final int SIGNAL = -1; /** * The node is in a wait queue, and the node thread is waiting on the Condtion. When another thread calls signal() on the Condtion, the node willMove from the wait queue to the synchronous queueStatic final int CONDITION = -1; static final int CONDITION = -1; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; PROPAGATE = PROPAGATE; /** current node waitStatus */ volatile int waitStatus; /** Precursors */ volatile Node prev; /** Next Node */ volatile Node next; /** The Thread associated with the node */ volatile Thread Thread; /** * This field is a SHARED constant if the current Node is SHARED, i.e. the Node type (exclusive and SHARED) * and the successor Node in the wait queue share the same field */ Node nextWaiter; }Copy the code
Node node is the basis of the synchronization queue. Pre and Next maintain a bidirectional queue successively. The structure of the synchronization queue is shown as follows:
When one thread succeeds in acquiring the synchronization state (or lock), other threads cannot acquire the synchronization state and are instead constructed as nodes and added to the synchronization queue
Exclusive get and release synchronization state
If a thread fails to obtain the synchronization status and enters the synchronization queue, the thread will not be removed from the synchronization queue when it is interrupted
public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
TryAcquire () is called to ensure that the thread can safely acquire the synchronization state (or lock). This method is implemented by its own synchronizer. If it fails, a synchronization node is constructed and added to the end of the synchronization queue by calling addWaiter(). Finally, call acquireQueued() in an infinite loop to get the synchronization status ④. If you do not get, call shouldParkAfterFailedAcquire () method to determine whether need to block, if return true congestion nodes of threads, precursor node can rely on a team or block the thread is interrupted to awaken the blocked thread
tryAcquire
Only exceptions are thrown inside the tryAcquire() body. This method must be overridden if the custom synchronizer is to get synchronization state exclusively
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
Copy the code
addWaiter
The node is added to the synchronization queue
Private Node addWaiter(Node mode) {// Create a new Node Node Node = new Node(thread.currentThread (), mode); // CAS quickly tries to insert the tail Node. if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; }} // Multiple attempts to enq(node); return node; }Copy the code
If the queue is empty or the CAS setting fails, enQ spin is set again
Private Node enq(final Node Node) {// Loop for (;;) {// get Node t = tail; If (t == null) {// Must initialize // cas set head if (compareAndSetHead(new Node())) // set tail = head; } else {// CAS set node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code
It can be found from the source code that if the synchronization queue fails to add nodes, there will be an endless loop until it is successfully added
acquireQueued
After the node enters the synchronization queue, it enters a spin process. When the condition is met and the synchronization state is obtained, it can exit from the spin process, otherwise it will continue to execute
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // Interrupt token Boolean interrupted = false; // For (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } / / to determine whether a thread to block the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
It can be found from the source code that only the precursor node of the current node is the head node can attempt to obtain the synchronization state, for the following reasons: ①. The head node is the node that has successfully obtained the synchronization status. After releasing the synchronization status, the head node wakes up its successor nodes. After waking up, the successor node needs to check whether it is the head node (2). Keep the FIFO synchronous queue principle
blocking
After joining the queue, the thread spins continuously to obtain synchronization status, but during the spinning process, it needs to determine whether the current thread needs to block
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; Private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {/ / precursor nodes wait states int ws = Mr Pred. WaitStatus; If (ws == Node.SIGNAL) return true; if (ws == node. SIGNAL) return true; if (ws == node. SIGNAL) return true; If (ws >0) {if (ws >0) {if (ws >0) { Do {node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // CAS sets the status of the precursor to SINNAL compareAndSetWaitStatus(pred, ws, node.signal); // CAS sets the status of the precursor to SINNAL compareAndSetWaitStatus(pred, ws, node.signal); } return false; }Copy the code
If shouldParkAfterFailedAcquire returns true, it will call parkAndCheckInterrupt method, the main method of internal call LockSupport tool class park blocking threads () method
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // Return thread.interrupted (); }Copy the code
Acquire () method flow
Public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! If (0) // Wake up the unparkprecursor (h); return true; } return false; }
Private void unparksucceeded (Node Node) {// Get the current wait state int ws = node.waitStatus; If (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; / / if subsequent node is null or its status to CANCELLED (wait for supermarket or interruption) if (s = = null | | s. aitStatus > 0) {s = null; For (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) // Wake up the thread associated with the node locksupport.unpark (s.read); }Copy the code
Copy the code
From the source, you can see that the awakened node is traversed from the end rather than the beginning, because the successor of the current node may be null, wait timeout, or interrupted, so it is traversed from the end forward
Shared synchronization state acquisition and release
The main difference between shared and exclusive access is that multiple threads can obtain a synchronization state at the same time. For example, a read lock in ReentrantReadWriteLock is used
Public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // If (tryAcquireShared(arg) < 0) doAcquireShared(arg); }Copy the code
Private void doAcquireShared(int arg) {// Add the SHARED Node to the queue final Node Node = addWaiter(node.shared); boolean failed = true; // Interrupt token Boolean interrupted = false; // loop for (;;) Final Node p = node.predecessor(); Int r = tryAcquireShared(arg); // If (p == head) {int r = tryAcquireShared(arg); Propagate(node, r); propagate (node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; }} / / to determine whether a thread to block the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Copy the code
The CAS service is used. If the CAS operation fails, the spin cycle is performed again
The synchronization status is obtained due to timeout
Using the built-in lock synchronized may cause deadlocks, but AQS provides timeout synchronization, that is, obtaining synchronization status within a specified period of time
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); }Copy the code
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code
Timeout control
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // Final long deadline = system.nanotime () + nanosTimeout; // Final Node Node = addWaiter(node.exclusive); boolean failed = true; Try {// loop for (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return true; } // nanosTimeout = deadline-system.nanotime (); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); If (thread.interrupted ()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code
Copy the code
Thank you
The Art of Concurrent Programming in Java