Most of the Java synchronization (Lock, Semaphore, already, etc.) are based on AbstractQueuedSynchronizer (AQS). AQS is a simple framework that provides atomically managing synchronization state, blocking and waking up thread capabilities, and a queue model.
ReentrantLock
A ReentrantLock is a ReentrantLock, which means that a thread can repeatedly lock a critical resource. ReentrantLock supports fair and unfair locks, and the underlying ReentrantLock is implemented by AQS. So how does ReentrantLock relate to AQS through fair and unfair locks? ReentrantLock provides a Sync inner class that inherits FROM AQS, and ReentrantLock’s fair and unfair locks are associated with AQS by inheriting from Sync.
We focus on the locking process of these two to understand the relationship between them and AQS.
reentrant
AQS provides a synchronization State to control the overall reentrant situation. State is Volatile and is used to ensure some visibility and order.
AbstractQueuedSynchronizer
private volatile int state;
Copy the code
ReentrantLock is therefore reentrant.
The State field is the main process:
- State is initialized to 0, indicating that no thread holds the lock.
- When a thread holds the lock, the value will increase by 1. If the same thread holds the lock more than once, the value will increase by 1.
- 3, unlock is also for this field -1, up to 0, this thread on the lock release.
Fair locks VS unfair locks
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}Copy the code
The logic of the above code is:
- 1. If the CAS variable State (synchronization State) is set successfully, that is, the lock is obtained successfully, then the current thread is set as the exclusive thread.
- 2. If the variable State (synchronization State) fails to be set through CAS, that is, the lock fails to be obtained, enter the Acquire method for subsequent processing.
The first step is easy to understand, but what happens when the second step fails to acquire the lock? There are two possibilities:
- (1) Set the lock acquisition result of the current thread as failure, and the lock acquisition process ends. This design will greatly reduce the concurrency of the system, which does not meet our actual needs. Therefore, the following process is required, that is, the processing process of AQS framework.
- (2) There is some queuing mechanism, threads continue to wait, still retain the possibility of acquiring the lock, lock acquisition process is still continuing.
In the second case, there are several questions:
- Speaking of queueing, there must be some sort of queue. What is the data structure of that queue?
- When does a queued thread get a chance to acquire a lock?
- If a thread in a queued mechanism has been unable to acquire the lock, is it still required to wait, or is there another strategy to solve this problem? So let’s write these down and we’ll talk about them later.
Same with fair locking
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
Copy the code
ReentrantLock uses non-fair locks by default, and can also be specified using fair locks via the constructor. By comparing the above source code, it can be seen that the only difference between the lock() method of a fair lock and that of an unfair lock is that the fair lock obtains the synchronous state with one more constraint: hasqueued24 (). This method does one thing: it basically determines whether the current thread is the first one in the synchronization queue. Returns true if it is, false otherwise.
To sum up, fair lock is to achieve multiple threads to obtain locks in accordance with the order of lock application through synchronization queue, so as to achieve fair characteristics. Unfair lock does not consider the queuing problem when adding the lock, directly try to obtain the lock, so there is the situation of obtaining the lock first after applying.
Here, I see that the lock process of fair lock and unfair lock is different to some extent, but they both call Acquire method, which is the core method in AQS, the parent of FairSync and UnfairSync. For the above mentioned problem, need by acquire to solutions, and acquire a method is implemented in class AbstractQueuedSynchronizer. AQS and the association between ReentrantLock and AQS will be discussed in more detail below.
AQS architecture
Generally speaking, the AQS framework is divided into five layers, from shallow to deep from top to bottom, from API exposed by AQS to underlying basic data. When a custom synchronizer comes in, you only need to rewrite some of the methods required by the first layer, without worrying about the underlying implementation process. When the custom synchronizer to lock or unlock operation, after the first layer of API into AQS internal method first, then after the second lock acquisition, and then failed to get the lock for the process, into the third and fourth layer waiting queue processing, and these are handled are dependent on the fifth floor provides the basis of the data layer.
The core idea of AQS is that if the requested shared resource is idle, the thread of the current requested resource is set as a valid worker thread, and the shared resource is set as a locked state. If shared resources are occupied, some blocking wake-up mechanism is required to ensure lock allocation. This mechanism is implemented primarily using a variant of CLH queues, which enqueue threads that temporarily cannot acquire locks.
CLH: Craig, Landin and Hagersten queues are unidirectional linked lists. Queues in AQS are virtual bidirectional queues (FIFO) of CLH variants. AQS is to achieve lock allocation by encapsulating each thread requesting shared resources into a node. AQS uses a Volatile member variable of type INT to indicate the synchronization status, uses the built-in FIFO queue to complete the queuing of resource acquisition, and uses CAS to complete the modification of State value.
1. Data structure — Node
AQS defines a Node class Node. Here are the main methods and attribute values:
- WaitStatus: indicates the status of the current node in the queue
- Thread: Indicates the threads on the node
- Prev: indicates the precursor pointer
- Predecessor: Returns the precursor nodes, without which the NPE is thrown
- NextWaiter: points to the next node in CONDITION. (This pointer is not covered in this article because CONDITION Queue is not covered.)
- Next: the successor pointer
2. Lock mode
- SHARED: Threads wait for locks in SHARED mode
- EXCLUSIVE: Indicates that a thread is waiting for a lock exclusively
3. Thread node status
The status of the current node in the queue waitStatus is as follows:
- 0 Specifies the default value when a Node is initialized
- CANCELLED: 1 means that the thread’s request to obtain the lock has been CANCELLED
- CONDITION: -2, indicating that the node is in the wait queue and the node thread is waiting to wake up
- PROPAGATE: is -3, the field is only used when the current thread is in SHARED
- SIGNAL: -1, indicating that the thread is ready for resource release
Custom synchronizer
AQS provides a number of Protected methods for custom synchronizer implementations. The related methods of custom synchronizer implementation are also only intended to implement multi-threaded exclusive or shared mode by modifying the State field. Custom synchronizers need to implement the following methods (ReentrantLock needs to implement the following methods, but not all of them) :
- Protected Boolean isHeldExclusively() : Indicates whether the thread is monopolizing the resource. You only need to implement it if you use Condition.
- Protected Boolean tryAcquire(int arg) : Exclusive. Arg is the number of times the lock was acquired. Attempts to acquire the resource are returned True on success and False on failure.
- Protected Boolean tryRelease(int arg) : Exclusive. Arg is the number of times the lock was released, and returns True on success or False on failure.
- Protected int tryAcquireShared(int arg) : Shared. The arG attempts to acquire resources for the number of times the lock is acquired. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
- Protected Boolean tryReleaseShared(int arg) : Share mode. Arg is the number of times the lock is released, and attempts to release the resource. Returns True if it is allowed to wake up subsequent waiting nodes after release, False otherwise.
In general, custom synchronizers are either exclusive or shared, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock. ReentrantLock is an exclusive lock, so tryacquire-tryRelease is implemented.
1. Obtain the lock
Let’s go back to the ReentrantLock unfair lock method at the beginning of this article:
ReentrantLock.java final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } AbstractQueuedSynchronizer.java public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ReentrantLock.java protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code
The logic of the above code is as follows: It can be seen that this is only a simple implementation of AQS, the specific lock acquisition method is independently implemented by the respective fair lock and unfair lock (take ReentrantLock as an example). If this method returns True, the current thread acquired the lock successfully and no further execution is required. If the fetch fails, it needs to join the wait queue. When and how a thread is added to a wait queue is explained in detail below. The addWaiter method simply adds the corresponding thread to a double-ended queue as a Node data structure and returns a Node containing the thread. This Node is entered as a parameter in the acquireQueued method. The acquireQueued method “acquires the lock” on a queued thread.
In general, a thread fails to acquire the lock and is queued, and acquireQueued keeps the queued thread acquiring the lock until it succeeds or is no longer needed (interrupted).
2. Join the queue
If the lock fails to be obtained, the addWaiter(Node.exclusive) is invoked to join the wait queue as follows:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }Copy the code
The logic of the above code is:
- Create a new node with the current thread and lock mode.
- The Pred pointer points to the Tail node.
- Set the Node Prev pointer in New to Pred.
- The tail node is set using the compareAndSetTail method. This method compares tailOffset with Expect. If tailOffset and Expect have the same Node address, set Tail to Update.
AQS defines static variables that represent offsets
static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); }}Copy the code
As can be seen from AQS static code blocks, it is used to obtain an object’s property relative to the object’s memory offset, so that we can find the property in the object’s memory according to the offset. TailOffset refers to the offset of tail, so the new Node is set to the end of the current queue. At the same time, because it is a bidirectional list, you also need to point the previous node to the last node.
If the Pred pointer is Null (indicating that there are no elements in the wait queue), or if the current Pred pointer and Tail point to a different position (indicating that it has been modified by another thread), you need to look at Enq’s methods.
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code
If it is not initialized, a header needs to be initialized. Note, however, that the initialized header is not the current thread node, but the node that invoked the no-argument constructor. If initialization or concurrency results in an element in the queue, the method is the same as before. In fact, addWaiter is simply an operation to add a tail node to a double-ended list. Note that the head of the double-ended list is the head of a no-parameter constructor.
For fair locks, the principle is first come, first served, so when requesting locks, you need to make the following decisions:
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h ! = t && ((s = h.next) == null || s.thread ! = Thread.currentThread()); }Copy the code
Hasqueuedtoraise is a method used to determine whether a valid node exists in a wait queue when a fair lock is added to a lock. If False is returned, the current thread can compete for shared resources. If True is returned, a valid node exists in the queue and the current thread must be added to the queue. So if we look at this, we understand h! = t && ((s = h.next) == null || s.thread ! = Thread.currentThread()); Why determine the next node of the header? What data is stored in the first node?
In a bidirectional linked list, the first node is a virtual node, which does not store any information, but is just a placeholder. The real first node with data starts at the second node. When h! If (s = h.next) == null, the queue is being initialized by a thread, but Tail points to Head, Head does not point to Tail, and there are elements in the queue, return True (see code analysis below for details). If (s = h.ext)! = null: indicates that at least one node is valid in the queue. If Thread == thread.currentthread (), the Thread in the first valid node of the queue is the same as the currentThread. If s.t hread! = thread.currentthread (), indicating that the first valid node Thread in the wait queue is different from the currentThread, and the currentThread must join the wait queue.
3. Spin wait
Go back to the code above to get the lock:
AbstractQueuedSynchronizer public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
In the previous section we looked at addWaiter, and here we begin by analyzing the acquireQueued source code:
AbstractQueuedSynchronizer final Boolean acquireQueued (final Node Node, int arg) {/ / marked success get resources Boolean failed = true; Boolean interrupted = false; // Start the spin, either acquire the lock, or interrupt for (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(arg)) {setHead(node); setHead(node); p.next = null; // help GC failed = false; return interrupted; } // p is the head node and does not currently acquire the lock (it may be unfair that the lock is preempted) or p is not the head node. In this case, it is necessary to determine whether the current node will be blocked (blocked condition: waitStatus of the precursor node is -1) to prevent infinite loop waste of resources. Concrete under the two methods to analysis the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }} / / judge whether the current thread should be blocked by precursor Node private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Int ws = pred.waitStatus; If (ws == node. SIGNAL) return true; // If (ws >0) {do {// Loop forward to find the cancelled node and remove the cancelled node from the queue. } while (pred.waitStatus > 0); pred.next = node; } else {// Set the wait status of the former Node to SIGNAL compareAndSetWaitStatus(pred, ws, node.signal); } return false; }Copy the code
In general, the exit condition is when “the leading node is the head node and the current thread acquired the lock successfully”. To prevent wasting CPU resources due to an infinite loop, we determine whether to suspend the current thread based on the state of the leading node
4. Cancel spin wait
Similarly, cancelAcquire in the Finally code in the acquireQueued method:
AbstractQueuedSynchronizer private void cancelAcquire (Node to Node) {/ / Node filter will be invalid if (Node = = null) return; Thread = null; thread = null; thread = null; Node pred = node.prev; While (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; CANCELLED node.waitStatus = node.cancelled; // If the current node is the last node, set the first non-canceled node from back to last node as the last node. If (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null); } else { int ws; // If the current node is not a successor of the head, 1: check whether the current node's precursor is SIGNAL, 2: if not, set the precursor to SINGAL to check whether the current node's precursor is SIGNAL. // If all the above conditions are met, the successor pointer of the current node points to the successor node of the current node. = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! = null) { Node next = node.next; if (next ! = null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else {// If the current node is the successor of head, or the above conditions are not met, wake up the unparksucceeded (node) of the current node; } node.next = node; // help GC } }Copy the code
The logic of the above code is: obtain the predecessor Node of the current Node. If the predecessor Node is CANCELLED, go straight ahead and find the first waitStatus <= 0 Node. Associate the found Pred Node with the current Node and set the current Node to CANCELLED. Based on the location of the current node, consider the following three scenarios:
- (1) The current node is the tail node.
- (2) The current node is the successor node of Head.
- (3) The current node is neither a successor of Head nor a tail node.
unlock
ReentrantLock does not distinguish between fair and unfair locks, so unlock is in the ReentrantLock class:
ReentrantLock
public void unlock() {
sync.release(1);
}
Copy the code
As you can see, where the lock is essentially released, it’s done through the frame.
AbstractQueuedSynchronizer
public final boolean release(int arg) {
// 自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
The method implemented by the subclass of the tryRelease call, the ReentrantLock class, is implemented in Sync, the common parent of the fair and unfair lock, since releasing the lock does not distinguish between fair and unfair:
Protected Final Boolean tryRelease(int Releases) {// Reduce the number of reentrants int c = getState() - releases; If (thread.currentThread ()! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; If (c == 0) {free = true; if (c == 0) {free = true; setExclusiveOwnerThread(null); } setState(c); return free; }Copy the code
Wake up other threads
AbstractQueuedSynchronizer private void unparkSuccessor (Node to Node) {/ / to get head Node waitStatus int ws = Node. WaitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // Get the next Node of the current Node. Node s = node.next; / / if the next node is null or be cancelled next node, node were found to queue at the beginning of the cancelled the if (s = = null | | s. aitStatus > 0) {s = null; // find the first node in the queue whose waitStatus<0. for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } // If the next node is not empty and the state <=0, unpark the current node if (s! = null) LockSupport.unpark(s.thread); }Copy the code
After waking up, return Thread.interrupted() is executed; This function returns the interrupt status of the current thread of execution and clears it.
AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
Back to the acquireQueued code, when parkAndCheckInterrupt returns True or False, the value of interrupted is different, but the next loop is executed. If the lock is obtained successfully, the current interrupted is returned.
Let’s go back to the acquire method:
AbstractQueuedSynchronizer public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
If acquireQueued is True, the selfInterrupt method is executed.
AbstractQueuedSynchronizer
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
Copy the code
The purpose of this method is to interrupt the thread. But why interrupt the thread after acquiring the lock? When an interrupt thread is woken up, it is not known why it was woken up, either because the current thread was interrupted while waiting or because it was woken up after a lock was released. So we check the interrupt flag with the thread.interrupted () method (which returns the interrupted status of the current Thread and sets the interrupt flag to False), record that the Thread has been interrupted, and interrupt again if it is found to have been interrupted. A thread is awakened while waiting for a resource, and then continues to try to acquire the lock until it does. That is, interrupts are not responded to throughout the process, only interrupt records are logged. Finally, the lock is grabbed and returned, so if it has been interrupted, it needs to be interrupted again.