The introduction
If the CAS mechanism we mentioned in our previous article “In-depth Understanding of Java Concurrent programming without locking CAS mechanism” is the foundation of Java concurrent programming, then this chapter tells you that the AQS are the core of the entire Java JUC. However, it is necessary to have some knowledge of CAS mechanism before learning AQS, because CAS can be seen everywhere in ReetrantLock and AQS.
Lock interface in JUC
At the beginning of our article on concurrent programming, we described solutions to the thread safety problem. In the previous article, we mentioned various solutions such as CAS lock-free mechanism and synchronized keyword. Among them, CAS mechanism belongs to optimistic lock type, while synchronized keyword belongs to pessimistic lock type. The AQs-based ReetrantLock that we will talk about in this chapter is also a pessimistic lock type implementation. But synchronized is not the same as synchronized. The synchronized keyword is implicit, and locks are acquired and released implicitly, without developer intervention. In this chapter, we will talk about explicit locks, that is, lock acquisition and release need to be manually coded implementation. In JDK1.5, the Lock Lock interface was added to java.uitl.concurrent. The Lock () and unlock() methods are defined to support explicit Lock and unlock operations. Explicit locks can be used as follows:
Lock lock = new ReetrantLock(); // Create a lock object
lock.lock(); // Get the lock operation
try{
// The code block that needs the lock modifier....
} finally{
lock.unlock(); // Release the lock
}
Copy the code
In the code above, when the program is running, after the current thread executes the lock() method, it means that the current thread occupies the lock resource. Before the current thread executes the unlock() method, other threads cannot enter the code block modified by the lock because they cannot obtain the lock resource, so they will be blocked until the current thread releases the lock. It is important to note that the unlock() operation must be placed in the finally block. This ensures that even if an exception thread is thrown during the execution of the lock, the lock resource will be released and the application will not cause a deadlock. In addition to defining Lock () and unlock() methods, the Lock interface also provides the following methods:
/** * Obtain lock: * if the current lock resource is free and available, the lock resource will be returned, * if not available, block and wait, continue to compete for lock resources, until the lock is returned. * /
void lock(a);
/** * Release the lock: * The current thread completes the transaction and changes the status of the lock resource from occupied to available and notifies the blocking thread. * /
void unlock(a);
/** * lock acquisition :(different from the lock method in that it can be interrupted during the lock acquisition operation) * lock acquisition returns if the current lock resource is available. * If the current lock resource is unavailable, it blocks until one of the following conditions occurs: * 1. The current thread has acquired the lock resource. * 2. After receiving the interrupt command, the current thread interrupts the lock acquisition operation. * /
void lockInterruptibly(a) throws InterruptedException;
/** * Non-blocking lock acquisition: * Attempts to obtain a non-blocking lock, calling this method returns the result immediately. * Return true if the lock was acquired, flase otherwise. * /
boolean tryLock(a);
/** * Non-blocking lock acquisition: * Obtains the lock based on the time passed in, and returns flase if the thread did not acquire the lock within that time period. * Returns true if the current thread acquired the lock during that time and was not interrupted. * /
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/** * Get wait notification component (which is bound to the current lock resource) : * The current thread cannot call the component's wait() method until it has obtained the lock resource. * The current thread will release the lock after calling the await() method. * /
Condition newCondition(a);
Copy the code
By analyzing the methods provided by the Lock interface above, it can be seen that Lock locks provide many features that synchronized locks do not have, as follows:
- ① Obtain lock interrupt operation (synchronized keyword is not supported to obtain lock interrupt);
- ② Non-blocking lock acquisition mechanism;
- (3) Timeout interrupt lock acquisition mechanism;
- ④ Condition, etc.
Lock interface implementer: ReetrantLock
ReetrantLock is a class added to JUC package in JDK1.5. It implements the Lock interface and has the same function as synchronized, but is more flexible than synchronized, but requires manual Lock acquisition/release. ReetrantLock itself is a lock that supports reentrant, that is, it supports the thread that currently obtains the lock to acquire the lock resource repeatedly, and it also supports fair and unfair lock. The fairness and the justice refers to the acquiring a lock lock resources after operation to get the order, if the acquiring a lock operation on first line first acquiring a lock, then represent the current lock is fair, on the contrary, if the first acquiring a lock operation on the thread behind the need and execution thread lock resources competition, acquiring the lock operation that represents the current lock is fair. It is worth noting here that the efficiency of unfair lock is far higher than that of fair lock in most cases, although there may be threads competing for lock resources. However, in some special business scenarios, such as where the priority of resource acquisition is more important, fair locking is the best choice. ReetrantLock supports lock reentrant, which means that the current thread can acquire the lock more than once. However, it is important to remember that the ReetrantLock can acquire the lock as many times as it needs to release the lock. Examples are as follows:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Task implements Runnable {
public static Lock lock = new ReentrantLock();
public static int count = 0;
@Override
public void run(a) {
for (int i = 0; i<10000; i++){ lock.lock();// Get the lock for the first time
lock.lock(); // Get the lock for the second time
try {
count++; // Non-atomic operations: there are thread safety issues
} finally {
lock.unlock(); // Release the lock for the first time
lock.unlock(); // Release the lock a second time}}}public static void main(String[] args) throws InterruptedException {
Task task = new Task();
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
// Result: 20000}}Copy the code
The above example is very simple, t1 and T2 threads simultaneously perform a non-atomic operation on the shared resource count, where we use the ReentrantLock lock to solve the existing thread-safety problem. ReentrantLock (finally) : unlock (finally) : unlock (finally) : unlock (finally) : unlock (finally); ReentrantLock is relatively simple to use, and we’ll look at some of the methods provided by ReentrantLock to get a better understanding of it. As follows:
// Query the number of times the current thread calls lock()
int getHoldCount(a)
// Returns the thread currently holding the lock, or null if the lock is not held by any thread
protected Thread getOwner(a);
// Returns a collection of threads that might be waiting to acquire the lock, and maintains a queue inside (subsequent analysis)
protected Collection<Thread> getQueuedThreads(a);
// Returns the estimated number of threads waiting to acquire this lock resource
int getQueueLength(a);
// Returns a collection of threads that may be waiting for the Condition associated with this lock (estimate)
protected Collection<Thread> getWaitingThreads(Condition condition);
// Returns the count of threads that did not execute signal() after calling the await method on the Condition object of the current lock resource
int getWaitQueueLength(Condition condition);
// Query whether the specified thread is waiting to acquire the current lock resource
boolean hasQueuedThread(Thread thread);
// Check whether any thread is waiting to acquire the current lock resource
boolean hasQueuedThreads(a);
// Check whether any thread is waiting for the Condition associated with this lock
boolean hasWaiters(Condition condition);
// Returns the current lock type, true if it is a fair lock, or flase if it is not
boolean isFair(a)
// Query whether the current thread holds the current lock resource
boolean isHeldByCurrentThread(a)
// Query whether the current lock resource is held by the thread
boolean isLocked(a)
Copy the code
Through observation, it is not difficult to know that ReentrantLock, as the implementer of the Lock interface, in addition to implementing the methods defined by the Lock interface, ReentrantLock also extends other methods. We can use a simple example to familiarize ourselves with some of the other methods of ReentrantLock. Examples are as follows:
import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Task implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static int count = 0;
// A simple use case for ReentrantLock
@SneakyThrows
@Override
public void run(a) {
for (int i = 0; i < 10000; i++) {
lock.lock(); // Block the lock for the first time
lock.tryLock(); // The second non-blocking lock is acquired
lock.tryLock(10,TimeUnit.SECONDS); // The third non-blocking wait to acquire the lock
try {
count++; // Non-atomic operations: there are thread safety issues
} finally {
lock.unlock(); // Release the lock for the first time
lock.unlock(); // Release the lock a second time
lock.unlock(); // Release the lock for the third time}}}public void reentrantLockApiTest(a) {
lock.lock(); / / acquiring a lock
try {
// Gets the number of times the current thread called the lock() method
System.out.println("Thread:" + Thread.currentThread().getName() + "\t call lock() times:" + lock.getHoldCount());
// Determine whether the current lock is a fair lock
System.out.println("Is the current lock resource type fair lock? + lock.isFair());
// Get the estimated number of threads waiting to acquire the current lock resource
System.out.println("Currently:" + lock.getQueueLength() + "Three threads are waiting to acquire lock resources!");
// Specifies whether the thread is waiting to acquire the current lock resource
System.out.println(Is the current thread waiting to acquire the current lock resource? + lock.hasQueuedThread(Thread.currentThread()));
// Check whether there are threads waiting to acquire the current lock resource
System.out.println("Is there a thread waiting to acquire the current lock resource?" + lock.hasQueuedThreads());
// Determine whether the current thread holds the current lock resource
System.out.println("Does the current thread hold the current lock resource?" + lock.isHeldByCurrentThread());
// Determine whether the current lock resource is held by the thread
System.out.println("Is the current lock resource occupied by threads?" + lock.isLocked());
} finally {
lock.unlock(); / / releases the lock}}public static void main(String[] args) throws InterruptedException {
Task task = new Task();
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count); // Result: 20000
* Thread: main Calls lock() times: 1 * Is the resource type of the current lock fair? False * Currently: 0 threads are waiting to acquire lock resources! * Is the current thread waiting to acquire the current lock resource? False * Is there a thread waiting to acquire the current lock resource? False * Does the current thread hold the current lock resource? True * Is the current lock resource occupied by a thread? true */task.reentrantLockApiTest(); }}Copy the code
ReentrantLock is a ReentrantLock lock that can be easily used in the ReentrantLock application. In fact, ReentrantLock is based on the AQS framework implementation, so before exploring the internal implementation of ReentrantLock we first take a look at AQS.
Iii. JUC and packet sending kernel: the basic component of concurrency AQS
Called the AQS AbstractQueuedSynchronizer queue synchronizer (abstract), Java and core infrastructure components in the contract, it is used to construct a semaphore, locks, door way and other synchronization component based framework.
Brief introduction of AQS working principle
As mentioned in the previous analysis of The Principle of Synchronized Keyword Implementation in A Thorough Understanding of Concurrent Programming in Java, the underlying implementation of Synchronized heavyweight lock is based on the counter in ObjectMonitor, and there are similarities and similarities in AQS. It controls the synchronization state internally by identifying a global variable of type INT state with the volatile keyword. When the state is 0, it means that no thread occupies the lock resource; otherwise, when the state is not 0, it means that the lock resource has been held by the thread. Other threads that want to obtain the lock resource must enter the synchronization queue and wait for the thread that currently holds the lock to release it. AQS constructs FIFO(First-in, first-out) synchronization queue through internal class Node to process threads that have not obtained lock resources, and adds the threads waiting to obtain lock resources to the synchronization queue for queuing. Condition calls await(), threads waiting to acquire lock resources will be added to the queue. Condition calls signal(), threads waiting to acquire lock resources will be added to the queue. Threads are moved from the wait queue to the synchronous queue to compete for lock resources. It is worth noting that there are two types of queues: (1) synchronous queue: when a thread obtains a lock resource, it finds that other threads have joined the queue. ② Wait queue (there may be multiple queues) : the queue that Condition joins after calling await(); We should not confuse the two in our understanding. We can first analyze the synchronization queue in AQS, and the AQS synchronization queue model is as follows:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
// Points to the head of the synchronization queue
private transient volatile Node head;
// points to the end of the synchronization queue
private transient volatile Node tail;
// Synchronize status
private volatile int state;
// 省略......
}
Copy the code
Where head and tail are global variables of AQS, where head points to the head of the synchronization queue, but it should be noted that the head node is empty and does not store information, while tail points to the tail of the synchronization queue. In AQS, the synchronous queue adopts this method to construct a bidirectional linked list structure to facilitate the queue to add and delete nodes. When the thread calls the lock() method to acquire the lock during execution, if state=0, it means that the current lock resource is not acquired by other threads. If the current thread sets state value to 1, it means that the lock is successfully acquired. If state=1, it indicates that the current lock resource has been acquired by another thread, and the current thread will be encapsulated as a Node Node and added to the synchronization queue for waiting. Node Node is the encapsulation body of each thread that obtains lock resources, including the thread itself and the state of the thread, such as whether it is blocked, whether it is waiting to wake up, whether it is interrupted, etc. Each Node is associated with the precursor Node PREv and the successor Node NEXT, so that the thread holding the lock can quickly release the next thread waiting after it is released. The Node class structure is as follows:
static final class Node {
// Share mode
static final Node SHARED = new Node();
// Exclusive mode
static final Node EXCLUSIVE = null;
// Indicates that the thread is terminated
static final int CANCELLED = 1;
// Waiting to be woken up
static final int SIGNAL = -1;
// Condition Indicates the Condition status
static final int CONDITION = -2;
// Use in shared mode to indicate that the obtained synchronization state is propagated
static final int PROPAGATE = -3;
CANCELLED, SIGNAL, CONDITION, PROPAGATE
volatile int waitStatus;
// Synchronize the precursor node in the queue
volatile Node prev;
// Synchronize the successor node in the queue
volatile Node next;
// Get the thread of the lock resource
volatile Thread thread;
// The successor node in the waiting queue (related to Condition, examined later)
Node nextWaiter;
// Check whether the mode is shared
final boolean isShared(a) {
return nextWaiter == SHARED;
}
// Get the precursor node
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// omit the code.....
}
Copy the code
SHARED and EXCLUSIVE global constants represent the SHARED and EXCLUSIVE modes respectively. The SHARED mode allows multiple threads to operate on a lock resource at the same time. For example, Semaphore and ReadLock are implemented in the SHARED mode based on AQS. The exclusive mode means that only one thread is running to operate the lock resource at the same time. The implementation of components such as ReentranLock is based on the exclusive mode implementation of AQS. The global variable waitStatus represents the status of the thread currently encapsulated as a Node. There are five cases:
- 0 Initial value status: waitStatus=0, indicating that the node is initialized.
- CANCELLED: WaitStatus =1, the thread waiting in the synchronization queue timed out or was interrupted, and the Node should be CANCELLED from the synchronization queue. WaitStatus of the Node after waitStatus is CANCELLED means that the Node has entered the end state, and the current Node will not be changed.
- SIGNAL status: waitStatus=-1. The node identified as waitStatus notifies its thread to execute when its precursor thread releases the lock resource or is cancelled. Simply put, a node marked as in the current state is waiting to wake up, and as soon as the precursor node releases the lock, the thread of the subsequent node marked as in the SIGNAL state is notified to execute.
- CONDITION: WaitStatus =-2, associated with Condition. The node represented as this state is in the wait queue. The thread of the node is waiting on Condition. The nodes in CONDITION state are moved from the wait queue to the synchronous queue, waiting for competing lock resources.
- PROPAGATE state: waitStatus=-3, which is related to the sharing mode, in which the thread of the node identified as the state is runnable.
The global variables pre and Next represent the precursor and successor nodes corresponding to the current Node Node respectively, and Thread represents the thread object currently encapsulated. NextWaiter represents the successor of the current node in the wait queue (more on Condition later). At this point we have an overview of the structure of Node data types. In summary, as a core component of JUC, AQS exist two different implementations of locks, namely exclusive mode (such as ReetrantLock) and shared mode (such as Semaphore). However, both the implementation classes of the exclusive mode and the shared mode are implemented on the basis of AQS and maintain a queue internally. When the number of threads trying to obtain the lock exceeds the limit of the current mode, the thread will be encapsulated into a Node Node to join the queue for waiting. All these operations are completed by AQS. No matter ReetrantLock or Semaphore, most of their methods are completed directly or indirectly by calling AQS. The following is the structure of AQS overall class diagram:
- Internal AbstractOwnableSynchronizer abstract class defines the store the current thread holds a lock resources and storage thread information method.
- AbstractQueuedSynchronizer abstract class: AQS means AbstractQueuedSynchronizer acronym, the core of the whole AQS framework classes. Internal thread in the form of virtual queue for lock resource acquisition (tryAcquire) and release (tryRelease), but there is no default implementation of lock acquisition and release operations in AQS, the specific logic needs to subclass implementation, so that we can use it more flexibly in the development process.
- Inner classes in a Node inner classes: AbstractQueuedSynchronizer to build internal virtual queue AQS, convenient in AQS management need to obtain the thread lock.
- Sync internal abstract classes: Already inner classes, inheritance AbstractQueuedSynchronizer class and realizes its definition lock resources for tryAcquire () and release (tryRelease) method, and also defines the lock () method, which provide a subclass implementation.
- NonfairSync inner class: Inner class of ReentrantLock, inheriting Sync class, implementer of unfair lock.
- FairSync inner class: Inner class of ReentrantLock, inheriting Sync class, and implementer of fair lock.
- Lock interface: The top-level interface of the Java Lock class. It defines a series of Lock operation methods, such as Lock (), unlock(), and tryLock.
- ReentrantLock: implementer of the Lock Lock interface. There are three internal classes Sync, NonfairSync, and FairSync. When creating classes, you can use fair or unfair locks according to their internal parameters.
We can see through the relationship between the class diagram above AQS is an abstract class, but does not exist in the source code to achieve any abstract methods, the purpose of this is because the AQS design tend to be more as a basic component, do not want to directly as an action class to export, to provide infrastructure for the real implementation class, such as building synchronous queue, control the synchronization state, etc. From the point of view of design mode, AQS adopts the template mode to build. In addition to providing the core method of concurrent operation and synchronous queue operation, it also provides some template methods for subclasses to implement by themselves, such as lock operation and unlock operation. Why do you do so? This is because the AQS as basic components, encapsulation is the core of concurrent operation, but the implementation is divided into two modes, namely sharing mode and exclusive mode, and the two models of lock and unlock the realization way is not the same, but the AQS public methods only focus on the internal implementation does not care about the external specific logic implementation of different patterns, so offer subclasses using the template method, That is, to implement an exclusive lock, such as ReentrantLock, you need to implement tryAcquire() and tryRelease() methods yourself, whereas to implement Semaphore in shared mode, The tryAcquireShared() and tryReleaseShared() methods need to be implemented. The benefits of this are obvious. The basic implementation of both shared and exclusive modes is the same set of components (AQS), but the locking/unlocking logic is different. More importantly, if we need to customize the lock, it is also very simple, just need to choose a different mode to achieve different locking and unlocking template methods, AQS provides exclusive mode and shared mode template methods as follows:
// Obtain lock method in exclusive mode
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// The lock release method in exclusive mode
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// How to obtain the lock in shared mode
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// Release lock in shared mode
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// A method to determine whether an exclusive lock is held
protected boolean isHeldExclusively(a) {
throw new UnsupportedOperationException();
}
Copy the code
Here we have a general understanding of the principle of AQS, a concurrent core component. Next we will take you to further analyze the specific implementation process of AQS based on ReetrantLock.
4. Analyze the realization process and principle of AQS exclusive mode based on ReetrantLock
4.1 NonfairSync in ReetrantLock is an unfair lock
The AQS synchronizer manages the state of the synchronization state based on its internal FIFO bidirectional linked list synchronization queue. When a thread fails to acquire the lock, the AQS synchronizer will encapsulate the thread itself and its related information into Node Node and join the synchronization queue. At the same time, it will block the current thread until the synchronization state identifier is released, and THEN AQS will wake up the thread in the head Node of the synchronization queue and let it try to modify the state identifier to obtain the lock. Below, we focus on the analysis of the specific logic of acquiring locks, releasing locks and wrapping threads into nodes to join queues. Here, we first analyze the specific implementation of AQS from the perspective of ReetrantLock unfair lock.
// Constructor: The default lock is of the NonfairSync type
public ReentrantLock(a) {
sync = new NonfairSync();
}
// Constructor: creates a lock type based on the passed argument (true fair lock /false non-fair lock)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// Lock/lock operation
public void lock(a) {
sync.lock();
}
Copy the code
4.1.1 Principle analysis of lock() method in ReetrantLock
Let’s start with unfair locks:
/** * Unfair lock class
*/
static final class NonfairSync extends Sync {
/ / lock
final void lock(a) {
// Perform the CAS operation to change the synchronization status to obtain lock resources
// Because it is possible for multiple threads to modify simultaneously, CAS operation is required to ensure atomicity
if (compareAndSetState(0.1))
// On success, the exclusive lock thread is set to the current thread
setExclusiveOwnerThread(Thread.currentThread());
else acquire(1); // If not, request the synchronization status again}}Copy the code
The implementation of lock acquisition in the NonfairSync class looks like this: First cas state to try to change the synchronization status identifier from 0 to 1. If it succeeds, it returns true, indicating that the synchronization state was successfully acquired, the lock resource was successfully acquired, and then the exclusive lock thread is set to the thread that currently obtains the synchronization state. If false is returned, acquire(1) will be executed. This method is insensitive to thread interrupt operations. It means that the current thread will not be removed from the synchronization queue even if the current thread fails to acquire the lock and is put into the synchronization queue to wait. Acquire (1) as follows:
public final void acquire(int arg) {
// Try again to get the synchronization status
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
Acquire () is the method provided in AQS, where the passed parameter arg represents the value set after obtaining the synchronization state (that is, the value of state to be set, while state 0 is the lock resource release state, and 1 is the lock resource occupation state), because the lock is to be acquired, the general transmission parameter is 1. After entering the method, the tryAcquire(ARG) method is first executed. In the previous analysis, we found that AQS implements this method by subclass, so NonfairSync’s tryAcquire(ARG) method is implemented by the Sync class inside the ReetrantLock class. The code is as follows:
/ / NonfairSync class
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
returnnonfairTryAcquire(acquires); }}// ReetrantLock class inner class - Sync class
abstract static class Sync extends AbstractQueuedSynchronizer {
/ / NonfairTryAcquire method
final boolean nonfairTryAcquire(int acquires) {
// Get the status of the current thread of execution and the current synchronizer
final Thread current = Thread.currentThread();
int c = getState();
// Check whether the synchronization status is 0 and try to obtain the synchronization status again
if (c == 0) {
// Perform the CAS operation to modify the synchronization identifier
if (compareAndSetState(0, acquires)) {
// If true sets the exclusive lock thread to the current thread
setExclusiveOwnerThread(current);
return true; }}// If the current thread has acquired the lock, it belongs to the reentrant lock. After acquiring the lock again, increment the state value by 1
else if (current == getExclusiveOwnerThread()) {
// Increments the current state value
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// Set the current synchronization state, currently only one thread holds the lock, because thread-safety questions do not occur
SetState (nexTC);
setState(nextc);
return true;
}
return false;
}
/ / omit...
}
Copy the code
Analyzing the code above, we can see that two things are done in the nonfairTryAcquire(acquires) method of an unfair lock:
- First, try to modify the synchronization flag to acquire the lock resource (because there may be the last thread to acquire the lock before the current thread failed to acquire the lock in the last period of time), if successful, set the exclusive lock thread to the current synchronization state of the thread, finally return ture.
- Check whether the current thread is the OwnerThread that owns the lock. If so, it means that the current thread has acquired the lock resource and has not released it, which belongs to lock reentrant. Then increment state by 1 and return true.
- If the current thread does not satisfy either of the preceding two judgments, return false, which means
nonfairTryAcquire(acquires)
No further action is required.
It is worth noting that the cas operation is used to modify the state synchronization flag in the nonfairTryAcquire(Acquires) method to ensure thread safety. So if any thread calls nonfairTryAcquire(Acquires) and sets the lock successfully, regardless of whether the thread is new or already in the synchronized queue, the lock is not fair. There is no guarantee that the thread in the synchronization queue will acquire the lock before the incoming thread (it may be that the head node has just released the synchronization state and the incoming thread has acquired the synchronization state), unlike fair locks, which will be analyzed later. So let’s go back to the acquire(1) method that was called in the lock() method of the NonfairSync class earlier:
public final void acquire(int arg) {
// Try again to get the synchronization status
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
In this case, if tryAcquire(ARG) returns true after successful lock acquisition, the if should not proceed, which would be ideal. However, if tryAcquire(ARG) returns false, addWaiter(Node.exclusive) encapsulates the thread entry (Node type node.exclusive because ReetrantLock is EXCLUSIVE). The addWaiter method code is as follows:
private Node addWaiter(Node mode) {
// Encapsulate the thread that failed to synchronize status as a Node Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// If the first node is added, it must be empty, skip.
// If it is not the first node, execute CAS enqueue directly and try to add it quickly at the end
if(pred ! =null) {
node.prev = pred;
// Perform tail node replacement using CAS, trying to quickly add at the tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// If the first join or CAS operation fails, enQ join is performed
enq(node);
return node;
}
Copy the code
In the addWaiter() method, we wrap the current thread and the passed Node type node.exclusive into a Node Node, and then assign the global variable tail(which refers to the Node at the end of the queue maintained by the AQS) to PreD. If the last Node is not empty, If the CAS operation fails, the enq(Node) method is executed. Of course, if the tail node is empty, which means that no node exists in the synchronization queue, then the enq(node) method is executed directly. We continue to analyze the implementation of enq(node) function:
private Node enq(final Node node) {
/ / death cycle
for (;;) {
Node t = tail;
// If the queue is null, there is no header
if (t == null) { // Must initialize
// Create and use CAS to set the header
if (compareAndSetHead(new Node()))
tail = head;
} else { // Add a new node at the end of the queue
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
Use for(;;) in this method. An infinite loop is started and CAS operations are performed within it (to avoid concurrency problems). It does two things: first, it creates a new node if the internal AQS synchronization queue has not been initialized and then calls compareAndSetHead() to set the node as the head node; The second is to quickly add the nodes passed in to the end of the queue if the synchronization queue already exists. Note that it is possible for multiple threads to operate at the same time in both steps. If one thread succeeds in modifying head and tail, the other threads will continue to loop until the modification succeeds. Here, CAS atomic operation is used to set the head node and replace the tail node to ensure thread safety. The head Node does not store any data. It is just a new Node. It acts as a lead Node, and tail always points to the tail Node (provided that the queue is not null).
For example, six threads T1,T2,T3,T4,T5, and T6 will join the queue at the same time, but only T2 succeeds in joining the queue, and the other five threads (T1,T3,T4,T5, and T6) will continue to loop until joining the queue succeeds.
Each node added to the synchronization queue enters a spin process. Each node starts to acquire the synchronization state when the timing condition is met, then exits from the synchronization queue and ends the spin, returning to the previous acquire() method. The spin procedure is performed in the acquireQueued(addWaiter(Node.exclusive), ARg) method as follows:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // Block the suspension flag
// An infinite loop of spin
for (;;) {
// Get the precursor node
final Node p = node.predecessor();
// Try to get synchronization status only if p is the head node
if (p == head && tryAcquire(arg)) {
// Set node to the head node
setHead(node);
// Set the original head node to NULL to facilitate GC
p.next = null; // help GC
failed = false;
return interrupted;
}
// If the precursor node is not head, determine whether to block the pending thread
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed)
// If the thread fails to obtain the synchronization status, the request is terminatedcancelAcquire(node); }}Copy the code
Threads in the current node attempt to obtain synchronization status (FIFO) during the execution of an infinite loop (spin) when the node’s precursor is the head node. Head node is currently holds the synchronization status identification thread, only when the head node release sync awakens the subsequent nodes, the subsequent nodes can get synchronization state, so that is why said: only when the precursor node as the head node to node start trying to get the cause of the synchronization state, other than in the time will be hung. If the current node is already trying to get synchronization status, the setHead() method is executed after entering if to set the current thread to the head node, as follows:
// Set the passed node to the head of the synchronization queue
private void setHead(Node node) {
head = node;
// Clear the data stored on the current node
node.thread = null;
node.prev = null;
}
Copy the code
After the node node is set to head the head node, the current node storage thread and precursor information will be empty, because the current thread has been successfully get to lock resources, no need to store thread information, at the same time because of the current node has become the head node, there is no precursor nodes, so the information will also be empty. The head node only keeps information pointing to the successor node to wake up the successor thread when the current node releases the lock resource. The above is the logic executed when the precursor node of the node is the head node. Precursors from if the node is not the head will perform the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; Logic, the code is as follows:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// Get the wait status of the current node
int ws = pred.waitStatus;
// Return true if the state is waiting to wake up
if (ws == Node.SIGNAL)
return true;
// If the wait status of the current node is greater than 0, it is the end state.
// Iterate through the precursor node until it finds a node with no end state
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// If the current node wait state is less than 0 and is not SIGNAL state,
// Sets it to SIGNAL state, indicating that the thread of the node is waiting to wake up
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt(a) {
// Suspend the current thread
LockSupport.park(this);
// Get the status of a thread interrupted by interruption ()
// Instead of interrupting the thread, the result may be true or false and return
returnThread.interrupted(); } LockSupport → park()public static void park(Object blocker) {
Thread t = Thread.currentThread();
// Sets the monitor blocker for the current thread
setBlocker(t, blocker);
// Calls native methods to jVM-level blocking mechanisms to block the current thread
UNSAFE.park(false.0L);
// Empty the blocker after blocking
setBlocker(t, null);
}
Copy the code
shouldParkAfterFailedAcquire()
The SIGNAL () method checks whether a node’s precursor is in SIGNAL state and returns true if so. If the waitStatus of the precursor node is greater than 0(only CANCELLED =1>0), it means that the precursor node is no longer used and should be removed from the synchronous queue, and all precursor nodes should be iterated through through the DO /while loop until the non-cancelled node is found. However, if the waitStatus of the precursor node of the current node is neither CANCELLED nor SIGNAL wake-up, it means that the node has just been transferred from the conditional waiting queue of Condition to the synchronous queue, and the node status is Condition. Therefore, it needs to be switched to SIGNAL state, so it needs to be switched to SIGNAL state and wait to be woken up.
当shouldParkAfterFailedAcquire()
If the method returns true, it indicates that the current node’s precursor is in the SIGNAL wake-up state, but the precursor is not a head nodeparkAndCheckInterrupt()
Suspend the thread and change the node state to WAITING. When the node is in WAITING state, it needs to wait for the unpark() operation to wake it up. At this point, the ReetrantLock operation is completed indirectly through the SYNCHRONOUS queue of AQS FIFO. Below, we can summarize the overall flow chart:
4.1.2 Principles of some other methods of obtaining lock resources in ReetrantLock
Reetrantlock. lock() is a method that can be used to lock objects, such as lockInterruptibly(), tryLock(), and ReetrantLock. All of these methods will eventually be indirectly called to the doAcquireInterruptibly() method. As follows:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// Encapsulate a Node to attempt to join the queue
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// Get the precursor node of the current node
final Node p = node.predecessor();
// If the precursor node is the head node, try to obtain the lock resource/synchronization status identifier
if (p == head && tryAcquire(arg)) {
// Set the current node to head
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// Throw an exception directly to interrupt the thread's synchronization status request
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}Copy the code
The difference with the lock() method is that:
/** ---------------lock()--------------- */
// If the precursor node is not head, determine whether to block the pending thread
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
/ * * -- -- -- -- -- -- -- -- lockInterruptibly (), tryLock () -- -- -- -- -- - * /
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// Throw an exception directly to interrupt the thread's synchronization status request
throw new InterruptedException();
Copy the code
In the mode of interruptible lock resource acquisition, when the interruption operation of the thread is detected, an exception is thrown directly to interrupt the synchronization state request of the thread and remove the synchronization queue.
4.1.3 Analysis of unlock() in ReetrantLock
In general, when we use an explicit lock like ReetrantLock, we also need to manually release the lock resource after acquiring the lock. In ReetrantLock, when you call lock() to acquire the lock resource, you also need to manually call unlock() to release the lock. Unlock () releases the lock as follows:
// ReetrantLock → unlock(
public void unlock(a) {
sync.release(1);
}
// AQS → release(
public final boolean release(int arg) {
// Try to release the lock
if (tryRelease(arg)) {
// Get the header for judgment
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
// Wake up the thread of the successor node
unparkSuccessor(h);
return true;
}
return false;
}
// Re-entrantLock → Sync → tryRelease(int Releases
protected final boolean tryRelease(int releases) {
// Change the synchronization status: lock acquisition is +, lock release is -
int c = getState() - releases;
// An exception is thrown if the thread currently releasing the lock is not the thread holding the lock
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
// Check whether the status is 0. If so, the synchronization status has been released
if (c == 0) {
free = true;
// Set Owner to null
setExclusiveOwnerThread(null);
}
// Set the update synchronization status
setState(c);
return free;
}
Copy the code
Unlock () finally releases the lock by calling tryRelease(int Releases), which is implemented by ReetrantLock, Because no implementation is provided in AQS, it is left to subclasses to implement the logic themselves. The release of lock resources wakes up the threads of the succeeding nodes using the unparkprecursor (h). The code for the unparkprecursor (H) is as follows:
private void unparkSuccessor(Node node) {
// node is the node where the current thread resides
int ws = node.waitStatus;
if (ws < 0) // Set the node status of the current thread to zero, allowing failure
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // Get the successor node of the current node
if (s == null || s.waitStatus > 0) { // If empty or finished
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)// Wait status <=0 is a valid node
if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread); // Wake up subsequent node threads
}
Copy the code
In the unparksucceeded (h) method, it is ultimately the unpark() method that awakens the threads that have not given up competing for lock resources in the succeeding nodes, that is, the node S whose waitStatus<=0. A spin method acquireQueued() was analysed earlier in our analysis of the principle of obtaining locks, We can now put it together and understand it together. If (p == head && tryAcquire(arg)) if (p == head && tryAcquire(arg)) Because executes shouldParkAfterFailedAcquire () method), the current thread holds a lock resources in the node node after release, s after unparkSuccessor () method of logic to handle, S became AQS synchronous queue the front did not give up thread lock resource competition, the final after shouldParkAfterFailedAcquire logic to handle () method, s nodes will also be a head the head node of the next node. So finally, in the spin method, the second loop through the if (p ==head && tryAcquire(ARG)) logic will be p==head, and S will set itself as head to indicate that it has acquired the lock resource. Finally, the entire acquire() method completes execution.
In a word, a FIFO synchronization queue is maintained inside AQS. When a thread fails to obtain the lock through reetrantLock. lock(), the thread will be encapsulated as a Node Node and join the synchronization queue to wait for the release of lock resources, during which the spin logic is continuously executed. When the precursor node of the node where the thread is located is the queue head node, the current thread will try to modify the synchronization status identifier state (+1). If the modification succeeds, it means that the lock resource is successfully obtained. Then, the node where the thread is located is set as the queue head node, indicating that it already holds the lock resource. So when a thread calls ReetrantLock.unlock() to release the lock, it ends up using tryRelease(int Releases) again to change the synchronization state identifier (-1) using the tryRelease(int Releases) method in the Sync inner class. On success, wake up threads in the successor node of the node where the current thread resides.
4.2 FairSync fair lock in ReetrantLock
In the previous we have detailed analysis of ReetrantLock non-fair lock implementation process, then we will go to ReetrantLock fair lock implementation principle. But before we get to that, let’s have a sense of what we need to be fair and unfair. The so-called fair and unfair are differentiated based on the time order of the arrival of threads. Fair locking refers to a mode that fully follows the FIFO principle. This means that, in chronological order, in fair lock mode, the thread that performs the lock acquisition logic first must hold the lock resource first. Similarly, an unfair lock works the other way around. Let’s look at the implementation of the tryAcquire(int Acquires) method in the FairSync class.
// ReetrantLock → FairSync → tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
// Get the current thread
final Thread current = Thread.currentThread();
// Get the synchronization status identifier
int c = getState();
if (c == 0) { // If the value is 0, no thread currently holds the lock resource
// In a fair lock implementation, the synchronization queue is checked to see if there is a node
if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Copy the code
NonFairSync (int acquires) ¶ tryAcquire(int acquires) ¶ In an implementation of a fair lock, hasqueuedToraise () is called to determine whether the synchronous queue inside the AQS has a node before attempting to modify state. If it exists, it indicates that a thread has submitted the lock request before, and the current thread will be directly encapsulated as a Node Node and appended to the end of the queue. In the tryAcquire(int Acquires) implementation of the non-fair lock, regardless of whether there is a Node in the queue or not, the synchronization state identifier will be modified first to obtain the lock. When the lock fails, the current thread will be encapsulated as a Node and added to the queue. However, in the actual development process, if we do not need to consider the execution order of business processing, we should give priority to the use of unfair lock, because often in the actual application process, the performance of unfair lock will greatly exceed fair lock!
4.3 How to choose between ReetrantLock and synchronized in the actual development process?
In the previous article: We talked in detail about the underlying implementation of implicit lock synchronization in Java in “thoroughly understand the principles of Synchronized implementation in Java”. We also talked about the implementation of implicit lock synchronization in Java after JDK1.6. The JVM has been largely optimized for the Synchronized keyword, so how do you choose between ReetrantLock and Synchronized in practice? Synchronized is relatively easy to use, semantically clear, and the JVM is automatically optimized for us. ReetrantLock is more flexible to use, but also provides a variety of support, such as timeout lock acquisition, interruptible lock acquisition, wake-up mechanism of multiple Condition variables (conditions), etc. So we can use ReetrantLock when we need to use these features. But which one to use depends on the business requirements.
For example: a project from 1 am to 5 am traffic is very large, but the access frequency is relatively low during the rest of the day, which lock is more appropriate for this situation? The answer is ReetrantLock.
Why is that? As mentioned in the previous article on synchronized, synchronized lock escalation/expansion is irreversible, and lock degradation almost never occurs during Java program execution. In such a business scenario, synchronized may directly expand into a heavyweight lock during the period of traffic surge. Once synchronized is upgraded to a heavyweight lock, every lock acquired after the lock is a heavyweight lock, which greatly affects program performance.
4.4 Summary of ReetrantLock implementation
- Basic components:
- Synchronization status: Displays the possession status of lock resources externally
- Synchronization queue: Stores the thread that failed to acquire the lock
- Wait queue: Used for multi-conditional wake up
- Node Node: Each Node of the queue, thread encapsulation body
- Basic action:
- The CAS modifies the synchronization status identifier
- Failed to obtain the lock and blocked in the synchronization queue
- Wakes up the first node thread of the synchronization queue when the lock is released
- Locking action:
- Call tryAcquire() to change identity state, execute true on success, join queue on failure
- If the node is in signal state, block and suspend the current thread
- If not, check whether the status is Cancel. If yes, go through and delete all the nodes in the queue
- If the node is in the 0 or propagate state, change it to signal state
- If the block is head, the lock is acquired on success, and the block continues on failure
- Unlock action:
- TryRelease () is called to change the identity state, returning true on success and false on failure
- Wake up the thread node that is blocked next to the synchronization queue after successful lock release
- The awakened node automatically replaces the current node as the head node
Fifth, the magical realization principle of Condition of multi-condition waiting awakening mechanism
In Java concurrent programming, each object in the Java heap is “accompanied” by a monitor object at birth, and each Java object has a set of monitor methods:wait()
,notify()
As well asnotifyAll()
. We can use these methods to implement collaboration and communication between Java multithreads, known as wake-up mechanisms, such as the common producer-consumer model. However, in the process of using Java objects, we need to use the synchronized keyword, because in fact, the wake-up mechanism of Java objects is based on monitor monitor objects. Condition is more flexible than the wake-up mechanism of synchronized because of synchronizednotify()
Only one thread waiting for a lock can be awakened randomly, while Condition can be more fine-grained and precise to wake up a thread waiting for a lock. Unlike synchronized wait-and-wake mechanisms, on the Monitor monitor model an object has one synchronization queue and one wait queue, whereas in AQS a lock object has one synchronization queue and multiple wait queues. The object Monitor Monitor lock is implemented as follows:
5.1. Get to know and practice Condition quickly
Condition is an interface class, the implementor of which is ConditionObject in AQS. Condition defines the following methods:
public interface Condition {
/** * Calling the current method causes the current thread to wait until signaled or interrupted * When other threads call singal() or singalAll() methods, The current thread will be awakened * when another thread interrupts the current thread's wait state by calling the interrupt() method * await() equivalent to the wait() method of synchronized wake-up */
void await(a) throws InterruptedException;
/** * has the same effect as await(), but does not respond to thread interrupt operations */
void awaitUninterruptibly(a);
/** * has the same effect as await(), but supports timeout interrupts (nanoseconds) * breaks the wait state when a thread waits beyond the nanosTimeout time */
long awaitNanos(long nanosTimeout) throws InterruptedException;
/** * has the same effect as awaitNanos(Long nanosTimeout), but can declare units of time */
boolean await(long time, TimeUnit unit) throws InterruptedException;
/** * has the same effect as await() and returns true if waked within the deadline or false */ in other cases
boolean awaitUntil(Date deadline) throws InterruptedException;
/** * When a thread calls this method, it wakes up a thread node in the wait queue * and obtains the thread from the wait queue mobile synchronization queue blocking wait lock resource * signal() is equivalent to the notify() method of synchronized wake-up */
void signal(a);
SignalAll () is equivalent to notifyAll() of synchronized wait-wake mechanism */
void signalAll(a);
}
Copy the code
The Condition interface defines two types of methods. One is await method and the other is signal method. Let’s take a look at the use of Condition to implement a classic consumer/producer mini-case:
public class Bamboo {
private int bambooCount = 0;
private boolean flag = false;
Lock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
public void producerBamboo(a) {
lock.lock(); // Get the lock resource
try {
while (flag) { // If there is bamboo
try {
producerCondition.await(); // Suspend the thread that produces bamboo
} catch (InterruptedException e) {
e.printStackTrace();
}
}
bambooCount++; // Bamboo +1
System.out.println(Thread.currentThread().getName() + "... Bamboo production, current bamboo quantity:" + bambooCount);
flag = true; // Change the bamboo allowance state to true
consumerCondition.signal(); // Wake up the consuming thread after the bamboo is produced
} finally {
lock.unlock(); // Release the lock resource}}public void consumerBamboo(a) {
lock.lock(); // Get the lock resource
try {
while(! flag) {// If there is no bamboo
try {
consumerCondition.await(); // Suspend the thread that consumes bamboo
} catch (InterruptedException e) {
e.printStackTrace();
}
}
bambooCount--; // The number of bamboos is -1
System.out.println(Thread.currentThread().getName() + "... Consumption of bamboo, current bamboo quantity:" + bambooCount);
flag = false; // Change the bamboo allowance state to false
producerCondition.signal(); // After consuming bamboo, wake up the thread producing bamboo
} finally {
lock.unlock(); // Release the lock resource}}}/**------------------ --------------------**/
/ / test class
public class ConditionDemo {
public static void main(String[] args){
Bamboo b = new Bamboo();
Producer producer = new Producer(b);
Consumer consumer = new Consumer(b);
// Producer thread group
Thread t1 = new Thread(producer,Producer-T1);
Thread t2 = new Thread(producer,Producer t2);
Thread t3 = new Thread(producer,"Producer-t3");
// Consumer thread group
Thread t4 = new Thread(consumer,"Consumer-t4");
Thread t5 = new Thread(consumer,"Consumer-t5");
Thread t6 = new Thread(consumer,"Consumer-t6"); t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); }}/ / producer
class Producer implements Runnable{
private Bamboo bamboo;
public Producer(Bamboo bamboo) {
this.bamboo = bamboo;
}
@Override
public void run(a) {
for(;;) { bamboo.producerBamboo(); }}}/ / producer
class Consumer implements Runnable{
private Bamboo bamboo;
public Consumer(Bamboo bamboo) {
this.bamboo = bamboo;
}
@Override
public void run(a) {
for(;;) { bamboo.consumerBamboo(); }}}Copy the code
The code above uses Condition briefly for a bamboo production/consumption example. In this case, there are six threads, T1, T2 and T3 are the producer thread group, t4, T5 and T6 are the consumer thread group, and the six threads execute at the same time. It is necessary to ensure that the production thread group produces bamboo before the consumer thread group can consume bamboo, otherwise the consumer thread group can only wait until the producer thread group produces bamboo. Repeat consumption should not occur. Two methods are defined in the Bamboo class: producerBamboo() and consumerBamboo() for the production and consumption of Bamboo. At the same time, a global ReetrantLock lock is defined to ensure that there is no thread safety problem when two groups of threads execute simultaneously. In order to ensure the sequence of production/consumption, two waiting conditions are created based on the lock lock object: producerCondition and consumerCondition. The former controls the production thread group to wait when the number of bamboo is not zero, and the latter controls the consumer thread group. Here, a flag is also defined to display the surplus of bamboo to the outside world. If it is false, it means there is no bamboo and it needs to produce bamboo first and wake up the consumer thread after the production is completed. If it is true, it is vice versa.
In the above case, compared with synchronized wait/wake up mechanism, the advantage is that two wait conditions, producerCondition and consumerCondition, can be created. Because there are two wait queues, producer thread group and consumer thread group can be precisely controlled. If synchronized wait()/notify() is used, the consumer thread may wake up the consumer thread after consuming bamboo because there is only one wait queue in the Monitor object. To avoid this problem with synchronized, notifyAll() is used to wake up all threads in the wait queue. However, performance is much slower than Condition because you need to wake up all the threads in the wait queue.
5.2. Condition realization principle analysis
Condition is just an interface, and its implementer is ConditionObject in AQS. When analyzing AQS in the beginning of this paper, we also mentioned that there are two kinds of queues in AQS: Synchronous queues and wait queues, which are based on Condition. The Node types in both the synchronization queue and the wait queue are composed of nodes within AQS, but the waitStatus of the Node in the wait queue is CONDITION. ConditionObject has two nodes: firstWaiter and lastWaiter, which store the first and last Node of the queue. Each Node uses Node.nextwaiter to store a reference to the next Node, so the queue is one-way. So the overall structure of AQS synchronizer is as follows:
As shown in the figure above, unlike synchronous queues, each Condition corresponds to a wait queue. If multiple conditions are created on a ReetrantLock lock, there will be multiple wait queues. Meanwhile, although the nodes in the synchronization queue and the wait queue are both composed of Node classes, the nodes in the synchronization queue are bidirectional linked list types referenced by preD precursor nodes and next successor nodes, while each Node in the wait queue only uses nextWaiter to store one-way linked list types referenced by successor nodes. But like synchronous queues, wait queues are FIFO queues, and each node of the queue stores information about the threads waiting on the Condition object. When a thread calls an await method, the thread first releases the lock, builds a Node to encapsulate the thread’s information, and queues it until it is awakened, interrupted, or timed out. Let’s explore the Condition wait/wake mechanism from a source code perspective:
public final void await(a) throws InterruptedException {
// Determine whether the thread is interrupted signal
if (Thread.interrupted())
// In response to an interrupt, an exception is thrown directly to interrupt thread execution
throw new InterruptedException();
// Encapsulate thread information to build a new node to queue and return
Node node = addConditionWaiter();
// Release the lock resource held by the current thread, regardless of how many times the current thread reenters, all set to 0
int savedState = fullyRelease(node);
int interruptMode = 0;
// Determine whether the node is in the SyncQueue, that is, whether it is woken up
while(! isOnSyncQueue(node)) {// If no wake-up is required, the current thread is suspended at the JVM level
LockSupport.park(this);
// Determine if an interrupt wakes up, if it exits the loop
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
// After waking up, spin to try to acquire the lock and determine if the thread is interrupted
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;// Clean up after cancellation
if(node.nextWaiter ! =null)
// Clear the nodes in the wait queue that are not in CONDITION state
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
// Build a node to encapsulate the thread information enqueue method
private Node addConditionWaiter(a) {
Node t = lastWaiter;
// Check whether the node is in the end state. If so, remove the node
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// Build a new node to encapsulate the current thread information. The node state is CONDITION wait state
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// Queue the node
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
From the above code observation, it is not hard to see that await() does four main things:
- Call addConditionWaiter() to build a new node that wraps thread information and adds it to the queue
- Call fullyRelease(node) to release the lock resource (state will be set to 0 no matter how many times the thread holding the lock reenters), and wake up the thread of the subsequent node in the synchronization queue.
- IsOnSyncQueue (node) is called to determine whether the node is in the synchronization queue. In this case, it is a spin operation. If there is no current node in the synchronization queue, the current thread is directly suspended at the JVM level
- AcquireQueued (Node, savedState) method is invoked to perform the spin operation after the current node thread is woken up, that is, when the node is transferred from the wait queue to the synchronization queue
At this point, the whole await() method ends and the whole thread completes the process from calling await() method → building node to join column → releasing lock resource to wake up synchronous queue successor node → suspending thread at JVM level → waking up competing lock resource. Other await() methods are similar in principle, so let’s look at singal() wakeup methods:
public final void signal(a) {
// Determine whether the current thread holds an exclusive lock resource, and throw an exception if it does not
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
// Wake up the thread waiting for the first node in the queue
if(first ! =null)
doSignal(first);
}
Copy the code
Here, the Singal () wakeup method does two things altogether:
- If the thread that calls the wake up method does not hold the lock resource, it will throw an exception directly. (Condition cannot be used because there is no waiting queue in shared mode.)
- Wake up the thread of the first node in the wait queue by calling the doSignal(first) method
Let’s look at the implementation of the doSignal(first) method:
private void doSignal(Node first) {
do {
// Remove the first node in the wait queue if nextWaiter is empty
// Indicates that there are no other nodes in the wait queue, so the last node is also empty
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// If notified that the last awakened node did not enter the synchronization queue (which may be interrupted),
// If there are other nodes in the queue, continue the loop to wake up the threads of the subsequent nodes
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
/ / transferForSignal () method
final boolean transferForSignal(Node node) {
/* * Try to change the waked node waitStatus to 0, which indicates that the node is not in the CONDITION waiting state. Return doSignal() to wake up subsequent nodes. * Because the thread that can execute up to this point must have an exclusive lock resource, * and the CAS mechanism is used to modify waitStatus, there is only one reason for failure: * The expected waitStatus is not equal to CONDITION */
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
// Quickly appends to the end of the synchronization queue and returns the precursor node P
Node p = enq(node);
// Check whether the status of the precursor node is ended or fail to set the status of the precursor node to SIGNAL.
// Wake up the thread in the notified node
int ws = p.waitStatus;
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL))// Wake up the thread in node
LockSupport.unpark(node.thread);
return true;
}
Copy the code
In the code above, you can see from my comments that doSignal() also does three things:
- Remove the first waked node from the wait queue, and then maintain the firstWaiter and lastWaiter pointing node references in the wait queue
- Second, the node removed from the wait queue is appended to the end of the synchronization queue. If the synchronization queue fails to be appended or there are other nodes in the wait queue, the thread of the other node will continue to wake up
- 3. After joining the synchronization queue, if the status of the precursor node is ended or fails to set the status of the precursor node to SIGNAL, the thread in the node can be woken up through locksupport.unpark ()
At this point, the Signal() method logic ends, but it is important to note that we need to combine await()/ Signal() methods to understand the Condition’s wait/wake principle. After the signal() logic completes, the awakened thread exits from the spin of the preceding await() method because the node on which the current thread is located has been moved to the synchronous queue, so while (! If the isOnSyncQueue(node) condition is not true, the loop terminates and the awakened thread calls acquireQueued() to attempt to acquire the lock resource.
Condition interface and Monitor object wait/wake up mechanism difference
The difference between ReetrantLock’s Condition wait/wake up mechanism and Synchronized Monitor’s lock wait/wake up mechanism is as follows:
Compare the item | Monitor | Condition |
---|---|---|
precondition | An object lock is required | An exclusive lock is held and a Condition object is created |
Call way | Object.wait() | The condition. Await class methods are acceptable |
The queue number | a | multiple |
Release lock resources while waiting | support | support |
Thread the interrupt | Does not support | support |
Timeout interrupt | Does not support | support |
Timeout waiting for | support | support |
Precise wake up thread | Does not support | support |
Wake up all threads | support | support |
Reference materials and books
- Deep Understanding of the JVM VIRTUAL Machine
- The Beauty of Concurrent Programming in Java
- Java High Concurrency Programming
- Core Technology of Website Architecture with 100 million Traffic
- Java Concurrent Programming
At this point, the realization principle analysis of AQS exclusive mode comes to an end. In the next article, we will further explore the concrete realization of sharing mode ~