Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.
This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
Introduction to the
AbstractQueuedSynchronizer (queue synchronizer) is used to construct the lock or other synchronous component based framework, it through the use of a variable of type int to represent the synchronization state at the same time, the built-in FIFO queue to complete resources for thread line up work, AbstractQueuedSynchronizer is the foundation of most of the synchronization needs to realize.
1. Lock interface
Before learning AbstractQueuedSynchronizer know about the Lock interface. Before we look at the Lock interface, what is a Lock? Locks are used to control how multiple threads access a shared resource. A mutex prevents multiple threads from accessing a shared resource at the same time. Before the Lock interface, we all know that Java programmers use the synchronized keyword to implement the Lock function. After Java SE 5, the Lock interface and its related implementation are added to the package to achieve the function. The Lock interface and its implementation provide synchronization functionality similar to synchronized, with the following differences.
- Lock requires explicit Lock acquisition and Lock release. Although its convenience is low, it has stronger operability. Synchronized can implicitly acquire the Lock, but its way of acquiring the Lock is fixed, that is, acquire the Lock first and then release it
- Lock has the ability to interrupt, timeout Lock acquisition can be a variety of synchronized does not have the function
1.1 Lock Main features that the synchronized keyword does not provide
features | describe |
---|---|
Try to acquire the lock without blocking | The current thread attempts to acquire the lock, and if the lock is not acquired by another thread at this time, it successfully acquires and holds the lock |
Can be interrupted to obtain the lock | Lock The thread that holds the lock can respond to an interrupt. When the thread is interrupted, an interrupt exception is thrown to release the lock |
Timeout acquisition lock | If the lock is not acquired within the specified time, returns |
1.2 Usage Example
Lock lock = new ReentrantLock();
lock.lock();
try {
// Todo...
} finally {
lock.unlock();
}
Copy the code
One thing to note in the code above is that the lock must be released ina finally block to ensure that the lock is eventually released after it is acquired. Do not include the lock acquisition in the try block, because if an exception occurs during the lock acquisition, the lock release block should not be executed at this point.
1.3 Main API of Lock interface (Lock acquisition/Lock release)
Public interface Lock {** * get Lock; The thread calling this method acquires the lock, and when it does, returns */ void lock(); /** * Can interrupt the Lock, the difference is that the Lock method can respond to the interrupt, * * @interruptedexception */ void lockInterruptibly() throws InterruptedException; ** @return */ Boolean tryLock(); /** @return */ Boolean tryLock(); /** * The lock will be returned if: * 2. The current thread is interrupted within the timeout period * 3. The timeout period ends. Return false * * @param time * @param unit * @return * @throws InterruptedException */ Boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** */ void unlock(); ** @return */ Condition newCondition(); /* newCondition(); /* newCondition(); /* newCondition(); }Copy the code
\
Method names | describe |
---|---|
void lock() | Acquiring a lock; The thread calling the method acquires the lock, and when it does, returns from the method |
void lockInterruptibly() throws InterruptedException; | The difference between the Lock method and the interrupt method is that the method can respond to the interrupt, that is, the current thread can be interrupted during the Lock acquisition process |
boolean tryLock(); | A non-blocking lock is attempted and returned immediately after the method is called, true if it can be obtained and false otherwise |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; | The current thread obtains the lock within the specified timeout period. The current thread is interrupted within the specified timeout period. The value is false when the timeout period expires |
void unlock(); | Release the lock |
Condition newCondition(); | Gets the wait notification component, which is bound to the current lock. The current thread can call its wait() method only when it has acquired the lock, and the current thread releases the lock |
2, AbstractQueuedSynchronizer
AbstractQueuedSynchronizer (queue synchronizer) is the key to realize the lock, the lock synchronizer, the implementation of polymerization using synchronizer for locking semantics, Java. Util. Many can be blocked in the concurrent classes, Examples include ReentrantLock, Semaohore, ReentrantReadWriteLock, CountDownLatch, SynchronuosQueue, and FutureTask, which are all built on AQS. The relationship between the two is as follows:
- Locks are consumer-oriented and define the interface through which the user interacts with the lock, hiding implementation details
- Synchronizer is a lock-oriented implementor, which simplifies the implementation of locks and shields the low-level operations of synchronization state management, queuing, waiting and wake up of threads
2.1 AbstractQueuedSynchronizer interface with examples
The design of the synchronizer is based on a template method. The consumer inherits the synchronizer and overwrites the specified method, then combines the synchronizer into a custom synchronization component implementation and calls the provided template method of the synchronizer, which will call the overridden method of the consumer (subclass).
When overriding a method specified by the synchronizer, you need to use the following three methods provided by the synchronizer to access or modify the synchronization state
- GetState () : obtains the current synchronization status
- SetState (int newState) : Sets the current synchronization status
- CompareAndSetState (int expect, int Update) : CAS is used to set the current state. This method ensures atomicity of the state setting
Synchronizer can be overridden as follows:
/** * obtain the synchronization status exclusively. This method needs to query the current status and determine whether the synchronization status meets the expectation. * * @param arg @return */ protected Boolean tryAcquire(int arg) {throw new UnsupportedOperationException(); } /** * exclusive release synchronization state, * * @param arg * @return */ protected Boolean tryRelease(int arg) {throw new UnsupportedOperationException(); } /** * share synchronization status, return a value greater than or equal to 0, indicating success, Anyway, failed to get * * @ param arg * @ return * / protected int tryAcquireShared (int arg) {throw new UnsupportedOperationException (); } @param arg * @return */ protected Boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException(); } /** * The current synchronizer release is occupied by the thread in exclusive mode, General said this method to release was accounted for by the current thread @ return * * * / protected Boolean isHeldExclusively () {throw new UnsupportedOperationException (); }Copy the code
Conclusion:
Method names | describe |
---|---|
protected boolean tryAcquire(int arg) | Obtain the synchronization status exclusively. To achieve this method, you need to query the current status and determine whether the synchronization status meets expectations. Then set the CAS synchronization status |
protected boolean tryRelease(int arg) | Exclusive release of synchronization state, waiting for synchronization state will have the opportunity to acquire synchronization state |
protected int tryAcquireShared(int arg) | If the value is greater than or equal to 0, the synchronization status is obtained successfully |
protected boolean tryReleaseShared(int arg) | Release synchronization state in shared mode |
protected boolean isHeldExclusively() | The current synchronizer release is occupied by the thread in exclusive mode. This method usually indicates that the release is occupied by the current thread |
The template method provided by the synchronizer
The template methods provided by synchronizers fall into three main categories:
- Exclusive get and release synchronization state
- Shared get and release synchronization state
- Query the status of waiting threads in the synchronization queue
/** * get the synchronization status exclusively. If the current thread gets the synchronization status successfully, this method returns the synchronization status. * * @param arg/public final void acquire(int arg) {if (! tryAcquire(arg) && acquireQueued(addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * The same as acquire, but can respond to an interrupt if the current thread does not acquire the synchronization status and enters the synchronization queue * If the current thread is interrupted, The method throws InterruptedException * * @param arg * @throws InterruptedException */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (! tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * Add a timeout limit to the acquireInterruptibly method * Return false if the current thread does not obtain the synchronization status within the timeout period, Returns true * * @param arg * @param nanosTimeout * @return * @throws InterruptedException */ public final Boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } /** * The shared thread gets the synchronization status. If the current thread does not get the synchronization status, @param arg */ public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * The same as acquireShared, This method interrupt response * * @ param arg * @ throws InterruptedException * / public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } / * * * on the basis of acquireSharedInterruptibly increased the timeout limit * * @ param arg * @ param nanosTimeout * @ return * @ throws InterruptedException */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } /** * exclusive release synchronization state, this method after releasing synchronization state, * * @param arg * @return */ public final Boolean release(int arg) {if (tryRelease(arg)) { java.util.concurrent.locks.AbstractQueuedSynchronizer.Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; } /** * public final Boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; Public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>(); for (java.util.concurrent.locks.AbstractQueuedSynchronizer.Node p = tail; p ! = null; p = p.prev) { Thread t = p.thread; if (t ! = null) list.add(t); } return list; }Copy the code
Conclusion:
Method names | describe |
---|---|
void acquire(int arg) | If the current thread succeeds in obtaining the synchronization status, this method returns the synchronization status. Otherwise, a synchronous wait queue will be entered and the method will call the overridden tryAcquire(int arg) method |
void acquireInterruptibly(int arg) | Similar to the acquire method, but in response to interrupts, the current thread enters the synchronization queue without obtaining the synchronization status and throws InterruptedException if the current thread is interrupted |
tryAcquireNanos(int arg, long nanosTimeout) | AcquireInterruptibly Adds a timeout limit to the acquireInterruptibly method, which returns false if the current thread does not obtain synchronization status within the timeout period, or true if it does |
acquireShared(int arg) | If the current thread does not obtain the synchronization status, it will enter the synchronization queue to wait. The main difference from the exclusive mode is that multiple threads can obtain the synchronization status at the same time |
void acquireSharedInterruptibly(int arg) | As with the acquireShared method, this method responds to an interrupt |
tryAcquireSharedNanos(int arg, long nanosTimeout) | On the basis of acquireSharedInterruptibly increased timeouts |
boolean release(int arg) | Exclusive release synchronization, which wakes up the thread contained by the first node in the synchronization queue |
boolean releaseShared(int arg) | Shared release synchronization state |
Collection getQueuedThreads() | Gets the collection of threads waiting on the synchronization queue |
2.2 the custom an exclusive lock to strengthen the understanding of the working principle of AbstractQueuedSynchronizer
An exclusive lock means that only one thread can acquire the lock at a time, and other threads that acquire the lock can only wait in the waiting queue. Only when the thread that acquired the lock releases the lock, the subsequent thread can acquire the lock. (If you don’t know much about it, you can write it again and basically understand it.)
package com.lizba.p5; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; / * * * < p > * custom exclusive lock * < / p > * * @ Author: Liziba * @ the Date: 2021/6/19 22:40 */ public class Mutex implements Lock { private static class Sync extends AbstractQueuedSynchronizer { @override protected Boolean tryAcquire(int arg) {// If the current state is 0, the lock is acquired (compareAndSetState(0, 1)) {setExclusiveOwnerThread(thread.currentThread ()); return true; } return false; } @override protected Boolean tryRelease(int arg) { Call this method throws an exception if (getState () = = 0) {throw new IllegalMonitorStateException (); } // Clear the thread setExclusiveOwnerThread(null); // Set the share state to 0 setState(0); return true; } @override protected Boolean isHeldExclusively() {return getState() == 1; Return Condition () {return new ConditionObject(); Private Sync Sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.condition(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }}Copy the code
Summary of custom synchronization component Mutex Mutex:
This can be seen by customizing the synchronization component Mutex. Mutex defines a static inner class that inherits the synchronizer and implements the exclusive acquisition and release of synchronization state. When using Mutex, the user does not directly interact with the internal synchronizer. Instead, the user calls the methods provided by Mutex. In Mutex implementation, the lock() method is used as an example. This implementation approach significantly lowers the bar for implementing a reliable custom synchronized component. (Not to mention Doug Lea)
3, AbstractQueuedSynchronizer realization analysis
The main contents of the analysis include the following aspects
- Synchronous queue
- Exclusive synchronization state acquisition and release
- Shared synchronization state acquisition and release
- The synchronization status is obtained due to timeout
3.1 Synchronizing queues
Synchronous queue implementation relies on the inside of a synchronous (FIFO) queue to complete synchronization state management, and the priority queue is the inner class AbstractQueuedSynchronizer Node, This Node is used to hold thread references for synchronization failures, wait states, and precursor prev and successor next nodes.
Node source code key part of the explanation:
Static final class Node {/** * wait state * 0 initial state * -3 = PROPAGATE the next shared synchronization state is propagated unconditionally * -2 = CONDITION The node thread waits on Condition. When another thread calls signal() on Condition, the node is transferred from the waiting queue to the synchronization queue, and is added to the synchronization queue. * -1 = signal The thread of the current node releases the synchronization state or is cancelled, and the subsequent node is notified. */ volatile int waitStatus */ volatile int waitStatus */ volatile int waitStatus */ volatile int waitStatus */ volatile int waitStatus */ volatile int waitStatus */ /** * precursors */ volatile Node prev; /** * next Node */ volatile Node next; /** * The Thread whose status is synchronized */ volatile Thread Thread; /** * waits for a successor node on Condition. The current Node is in shared mode. It also represents the Node type (exclusive/shared) */ Node nextWaiter. }Copy the code
In the interior of the AbstractQueuedSynchronizer class represents synchronous node in the queue, and AbstractQueuedSynchronizer will hold the first node (the head) and end (tail), acquiring the synchronization status failed nodes to join the tail end of the queue.
/ * * * AbstractQueuedSynchronizer holding the synchronization of the head of the queue Node * / private transient volatile Node head; / * * * AbstractQueuedSynchronizer holding synchronous queue tail Node * / private transient volatile Node tail;Copy the code
View the structure of AQS through a set of diagrams
The basic structure of AQS synchronization queue
- The synchronizer holds the first and last nodes, both initially null
- The thread node that failed to get synchronization status is added to the end of the queue
\
The AQS synchronization queue joins the node
- The tail Node is set correctly using the CAS operation provided by compareAndSetTail(Node expect, Node Update)
- After the tail node is successfully added, the successor node of the original tail node points to the new tail node, and the precursor node of the new tail node is set as the original tail node
\
AQS synchronization queue sets the first node
- The sequential queue entry and exit of synchronous queue nodes follows the FIFO
- The original primary node releases the synchronization state and wakes up its successor node. After the successor node successfully obtains the synchronization state, it sets itself as the primary node
\
3.2 Obtaining and releasing the Exclusive Synchronization State
Acquire the synchronization state by calling acquire(int arg) on the synchronizer. Note that this method does not respond to interrupts.
\
Access to parse
Acquire (int ARg) source code parsing
Public final void acquire(int arg) {public final void acquire(int arg) {if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
Acquire (int arg) addWaiter(Node mode)
/** * Private Node addWaiter(Node mode) {// Create a new Node, set the Node information, Node Node = new Node(thread.currentThread (), mode); // Get the tail Node. If the tail Node is not empty, try setting Node pred = tail; if (pred ! = null) {// Set the drive node of the currently inserted node to the tail node of the current synchronization queue. If (compareAndSetTail(pred, node)) {if (compareAndSetTail(pred, node)) {if (compareAndSetTail(pred, node)) { Pred. Next = node; // Return node; } } enq(node); return node; }Copy the code
AddWaiter (mode) of enq (Node)
Private Node enq(final Node Node) {// Continue to loop until CAS is successfully inserted for (;;) { Node t = tail; If (compareAndSetHead(new Node())) tail = head; if (t == null) {// If (compareAndSetHead(new Node())) tail = head; } else {// Insert node drive node to the original tail node node.prev = t; If (compareAndSetTail(t, node)) {t.next = node; return t; }}}}Copy the code
Acquire (int arg) acquireQueued(final Node Node, int arg)
/** * get synchronization status in an infinite loop, Final Boolean acquireQueued(final Node Node, int arg) {Boolean failed = true; try { boolean interrupted = false; // loop for (;;) Final Node p = node.predecessor(); // Obtain the predecessor Node of the current Node. If the predecessor Node is null, a null pointer will be thrown. If (p == head && tryAcquire(arg)) {// Set the current node as the head node and set the node thread and the node's precursor node to null. help GC setHead(node); p.next = null; // help GC failed = false; return interrupted; } // If the condition is not met, determine whether to suspend locksupport. park(this) by checking the waitStatus status of the current node's precursor node; if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }} finally {if (failed) cancelAcquire(node); }}Copy the code
Why can I try to get synchronization status if and only if the precursor of the current node is the head node?
Let’s start by looking at a diagram of a node’s spin state to see why
- The head node is the node that has successfully obtained the synchronization state. After releasing the synchronization state, the thread of the head node wakes up its successor node. After waking up, the thread of the successor node needs to check whether its precursor node is the head node
- Maintains and maintains the queue out and queue in principle for the synchronous queue FIFO
Acquire (int ARG) Execution sequence diagram
3.2 release
When the synchronization state is acquired successfully, the current thread returns from acquire(int arg) and performs the response logic execution, which needs to release the synchronization state so that subsequent nodes can continue to acquire the synchronization state. Releasing the synchronization state calls the release(int arg) method, which wakes up other successor nodes after releasing the synchronization state.
Source code analysis:
Public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; // If (h! = null && h.waitStatus ! // Use LockSupport to wake up the threads in the wait state unparksucceeded (h); return true; } return false; }Copy the code
3.3 Summary of obtaining and releasing exclusive Synchronization Status
- When the synchronization state is acquired, the synchronizer maintains a synchronization queue, and any thread that fails to obtain the state is added to the queue and spins in the queue
- The condition for removing the queue or stopping the spin is that the precursor of the current node is the head node and has successfully obtained synchronization status
- Release (int arg) is called to release the synchronization state and wake up its successors if they exist
3.3 Obtaining and Releasing shared Synchronization Status
The difference between
The main difference between shared synchronization state acquisition and exclusive synchronization state acquisition is that the synchronization state can be obtained by multiple threads at the same time. For example, when a program reads a file, multiple read operations are performed simultaneously, but the write operations are blocked. Exclusive access, such as a write operation, blocks all other read and write operations.
Comparison of shared and exclusive access to resources
Source code analysis:
/** * Try to get share synchronization status, Public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }Copy the code
AcquireShared(int arg) doAcquireShared(int arg)
/** * */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) Final Node p = node.predecessor(); {// Determine whether the predecessor nodes of the current Node are head nodes. If (p == head) {tryAcquireShared(arg) >= 0 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
Shared synchronization is released
Public final Boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); / / Public final Boolean releaseShared(int arg) {doReleaseShared(); return true; } return false; }Copy the code
TryReleaseShared (int arg) to ensure that the tryReleaseShared(int arg) is released correctly, tryReleaseShared(int arg) is released correctly. The implementation can be read in Semaphore, a concurrent component that supports simultaneous access by multiple threads.
3.4 Obtaining the Exclusive Timeout Synchronization Status
Timeout synchronization state retrieval, which returns true if synchronization state can be obtained within the specified time period, false otherwise; This function is not provided by synchronized, which still blocks on Synchronized after the thread is interrupted; The doAcquireNanos(int ARg, Long nanosTimeout) method does not obtain the synchronization status within a specified period of time and returns InterruptedException.
Acquire, acquireInterruptibly, and tryAcquireNanos
Method names | describe |
---|---|
void acquire(int arg) | If the current thread succeeds in obtaining the synchronization status, this method returns the synchronization status. Otherwise, a synchronous wait queue will be entered and the method will call the overridden tryAcquire(int arg) method |
void acquireInterruptibly(int arg) | Similar to the acquire method, but in response to interrupts, the current thread enters the synchronization queue without obtaining the synchronization status and throws InterruptedException if the current thread is interrupted |
tryAcquireNanos(int arg, long nanosTimeout) | AcquireInterruptibly Adds a timeout limit to the acquireInterruptibly method, which returns false if the current thread does not obtain synchronization status within the timeout period, or true if it does |
The source code parsing
/** * Return false if the synchronization status is not obtained within the specified time. Public final Boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); / / if direct returns true on success, otherwise call doAcquireNanos return tryAcquire (arg) | | doAcquireNanos (arg, nanosTimeout); }Copy the code
doAcquireNanos(arg, nanosTimeout)
/** * Get synchronization status within the specified time, */ private Boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; Final Long deadline = system.nanotime () + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; // spin for (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); if (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return true; NanosTimeout = deadline-system.nanotime (); if (nanosTimeout <= 0L) return false; // Time greater than 0, The judge whether need to park the current thread if (shouldParkAfterFailedAcquire (p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); If (thread.interrupted ()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code
Flowchart for an exclusive timeout to get synchronization status