This blog focuses on the implementation principles of ReentrantLock. The main content includes:
- AQS principle and realization process
- ReenetrantLock Process of obtaining and releasing a lock and its mechanism
- Source code analysis
- Condition principle
- ReentrantLock versus Synchronized
1 AQS
1.1 AbstractQueuedSynchronizer
AbstractQueuedSynchronizer AQS for short, is a used to construct the lock and related frame synchronizer, it depends on the implementation of the FIFO waiting queue. See AbstractQueuedSynchronizer description:
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
Copy the code
In fact, many classes in the Concurrent package are built based on AQS, such as ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock, FutureTask, etc. AQS addresses many of the details of design when implementing synchronous containers. AQS maintains a FIFO queue, which is used to record waiting threads. The process of locking and releasing locks can be considered as the process of threading into and out of queues. Unable to obtain the lock, the queue is waiting; Wake up when exiting the queue. AQS has a status field, state, used by ReentrantLock to represent the number of threads reentrant locks, and Semaphore to represent the number of remaining permits. CAS operation is used to update the value of state variable to ensure atomicity of update operation. AbstractQueuedSynchronizer inherited AbstractOwnableSynchronizer, this class has only one variable: exclusiveOwnerThread, said the current thread take up the lock, and provides the corresponding the get and set methods. AQS uses this field to record the thread currently holding the lock.
1.2 the queue
1.2.1 queue
AQS uses a FIFO wait queue to represent the threads queuing for locks. The queue structure is shown in the figure below:
The queue head node is called a “sentinel node” or “dumb node” and is not associated with any thread, while the other nodes are associated with a waiting thread. Each time a thread releases the lock, it looks backwards from Head to find a node whose waitStatus is SIGNAL, and then wakes up the suspended thread with locksupport. unpark. The suspended thread continues with the attempted lock logic. When a new thread attempts to acquire the lock, if it fails to acquire it, it creates a Node and joins the end of the queue, then suspends itself, waiting for the suspension time to expire or to be woken up by the Node corresponding to the prev. WaitStatus =SIGNAL (); waitStatus=SIGNAL (); waitStatus=SIGNAL (); See unparkSuccessor method
1.2.2 Main Attributes of Node
type | The property name | describe |
---|---|---|
int | waitStatus | Wait state |
Node | prev | Points to the previous node in the queue |
Node | next | Points to the last node in the queue |
Thread | thread | Node’s holding thread |
Node | nextWaiter | The next Node waiting for condition |
1.2.3 Node waitStatus Value
state | The status value | describe |
---|---|---|
CANCELLED | 1 | A canceled state, such as a thread canceled because it waited for a lock timeout; Nodes in this state are kicked out of the queue and collected by the GC |
SIGNAL | – 1 | Indicates that this Node’s successor is blocked and needs to be notified |
CONDITION | 2 – | Indicates that the Node is in a condition queue and is blocked waiting for a condition |
PROPAGATE | – 3 | The head Node may be in this state when used in shared mode, indicating that the next lock acquisition can be propagated unconditionally |
other | 0 | The initial state |
1.2.4 Queue example
Let’s use a simple example to describe ReentrantLock wait and release. For the sake of description, nodes and threads are mapped one by one in the following example, for example Node1 corresponds to Thread1, and so on. Assume the initial state, as shown in the figure below:
When Thread3 requests a lock, it cannot obtain the lock, so it creates a Node and joins the queue. As shown below:
Suppose that after 3s, Thread1 will be woken up (the suspension time is specified, so it will be woken up when the waiting time expires), then its state will be changed to CANCELLED, indicating that the wait is removed from the queue. As shown below:
If a new thread requests the lock, the node in the CANCLE state will be removed during enqueueing. As shown below:
When Thread0 completes, the thread is released, which wakes up the thread corresponding to the first node after the Head whose state is waiting for SIGNAL.
Note: New nodes will be checked and CANCELLED when queued; In fact, Node1 will be CANCELLED when its status is changed to CANCELLED after being awakened after the waiting time expires.
2 process
This part briefly introduces the process of locking and releasing the lock, so that we have a whole concept of ReentrantLock, and then we will analyze the implementation details through the source code.
2.1 NonfairSync# lock
The main process of unfair lock is as follows:
This is only a flow chart, and the details of the process will be described in code later.
2.2 FairSync# lock
The main process of unfair lock is as follows:
As shown in the flow chart, the main flow is basically the same as the unfair lock, with two differences:
- The first step for NonfairSync to perform a lock is to try to obtain the lock from compareAndSetState(0, 1); FairSync doesn’t do this.
- NonfairSync obtains the lock through nonfairTryAcquire; FairSync obtains the lock through tryAcquire. The main logic of these two methods is the same, with the only difference being that FairSync checks to see if a Node exists in the FIFO queue before acquiring the lock. If not, the current thread attempts to acquire the lock by performing compareAndSetState(0, 1).
2.3 unlock
The process of releasing a fair lock is the same as that of releasing an unfair lock. The flowchart for releasing a lock is as follows.
3 Lock & UNLOCK source code analysis
Because most of the logic of fair lock and unfair lock is the same, so here is mainly to discuss the source of unfair lock.
3.1 the constructor
Here are the two constructors for ReentrantLock, as follows:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code
We can see that the ReentrantLock default constructor is unfair.
3.2 acquiring a lock
3.2.1 the lock
In the following code example, you can see the first difference between a fair lock and an unfair lock: The first step in an unfair lock is to try to obtain the lock by compareAndSetState(0, 1). Since it does not require fairness, it begins by scrambling for the lock.
3.2.1.1 ReentrantLock# lock
public void lock() {
sync.lock();
}
Copy the code
3.2.1.2 NonfairSync# lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
Copy the code
‘exclusiveOwnerThread’ : Holds information about the current thread, so if the lock is successful, set the ‘exclusiveOwnerThread’ to the current thread.
3.2.1.3 FairSync# lock
final void lock() {
acquire(1);
}
Copy the code
3.2.2 compareAndSetState
The compareAndSetState(0, 1) method is based on CAS operations that attempt to change state from 0 to 1. This is an atomic operation based on the hardware instruction set, changing it to 1 if and only if state=0. In the AQS section we mentioned the number of locks recorded by state in ReentrantLock (and of course ReentrantLock). State =0 indicates that no thread currently holds the lock. State >0 indicates the number of times the lock is performed. If the lock succeeds for the first time, state=1 and state++ is re-entered once. CAS operations in Java are almost always implemented using Unsafe, as follows:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
3.2.3 acquire
The method in belong to acquire AbstractQueuedSynchronizer; The main logic of this method is as follows:
- Try to acquire the lock, if successful, return directly.
- If the lock fails to be acquired, create a Node of type Node.EXCLUSIVE and join the queue. The queue here is the FIFO queue in the AQS, where the head is an empty Node and each of the remaining nodes is associated with a thread.
- A thread that has been enqueued attempts to acquire the lock, returns if it succeeds, and suspends if it fails (with certain conditions: waitStatus=SIGNAL on Node’s prev).
public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
3.2.4 tryAcquire
This method attempts to acquire the lock. If state=0, it means that the lock has been released, so you can try again, that is, compareAndSetState(0, 1) operation; If the state! =0, and the current thread holds the lock, then reentrant, update state, i.e. State ++. This method returns true if and only if there are two cases.
- If the lock has been released and the current thread’s compareAndSetState(0, 1) succeeds.
- The current thread holds the lock, which is equivalent to reentrant, updating the state
3.2.4.1 NonfairSync# tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Copy the code
3.2.4.2 NonfairSync# nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; } } // state ! = 0 indicates that a thread is holding the lock. If the current thread is holding the lock, it is equivalent to a reentrant operation, so state++. else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code
3.2.4.3 FairSync# tryAcquire
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) { if (! hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code
3.2.5 addWaiter
Methods on the addWaiter belongs to AbstractQueuedSynchronizer. The main logic of this method is to create a Node, associate it with the current thread, and then try to join the end of the FIFO queue. If the Head of a FIFO queue is empty, create an empty Node Head that is not associated with any thread. Nodes are then added to the end of the queue.
3.2.5.1 addWaiter
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // if tail is empty, the queue is not initialized. If (pred! = null) { node.prev = pred; If (compareAndSetTail(pred, Node)) {pred.next = Node; return node; } } enq(node); return node; }Copy the code
3.2.5.2 enq
If the FIFO queue is not already initialized, initialize it first; If it is already initialized, try adding node to the end of the queue and try again if it fails.
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code
3.2.6 acquireQueued
Methods on the addWaiter belongs to AbstractQueuedSynchronizer. The main logic of this method is as follows:
- Gets the prev node of a node.
- If the prev node is the head node, then the current node is the second node, and you can try to make the current thread acquire the lock. If successful, reset head and return the interrupted flag bit.
- If the prev node is not head, then it is not considered necessary to try to acquire the lock. At this point, determine whether the current thread should be suspended (pack), and if so, suspend the thread through LockSupport#park.
Note that if the thread fails to acquire the lock, it is suspended in the parkAndCheckInterrupt() method. Once a thread has fully released the lock resource, it wakes up the corresponding thread after the head node. When a thread is woken up, it continues to execute the code after parkAndCheckInterrupt() LockSupport#park.
3.2.6.1 acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // If the prev is head, then it is the second node and attempts to acquire the lock; If (p == head && tryAcquire(arg)) {setHead(node); if (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } // Determine if the current thread should be suspended, and if so, Then through LockSupport suspends the current thread if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
3.2.6.2 shouldParkAfterFailedAcquire
This method is used to determine whether the thread corresponding to the current node should be suspended. Suspend is allowed if waitStatus=SIGNAL of the prev node. This state means that the thread corresponding to the prev node should wake up the thread corresponding to the node after releasing the lock. If the PREV node waitStatus>0, i.e. CANCELLED, the preV needs to be removed from the queue. Retry this operation until a PRVE node is found with a status that is not CANCELLED. If the PREV node waitStatus<=0 (excluding SIGNAL status, of course), then the CAS operation sets waitStatus= SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
3.3 If waiting times out, lock acquisition is abandoned
The main logic is in doAcquireNanos. The main difference between doAcquireNanos and acquireQueued is: The loop checks whether the wait time has exceeded the specified time, and if so, changes Node’s waitStatus to CANCELLED.
3.3.1 tryLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
Copy the code
3.3.2 rainfall distribution on 10-12 tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
Copy the code
3.3.3 doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}Copy the code
3.4 release the lock
3.4.1 track unlock
public void unlock() {
sync.release(1);
}
Copy the code
3.4.2 release
To release the lock, the main logic is:
- Attempt to release the lock.
- If state=0, i.e. the thread holding the lock has CANCELLED the lock completely (including reentrant), try unpack the Node thread behind the head (there is a procedure to check whether the Node thread has been CANCELLED). Such nodes are removed from the queue).
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code
Rule 3.4.3 tryRelease
The main logic of this method is:
- If the current thread is not occupying the received thread, it is not allowed to be released.
- Set the state = state – the releases.
- If state=0, clean up the thread that holds the lock.
- Return true if state=0 after release; Return true only if Sync is not occupied by any thread.
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }Copy the code
3.4.4 unparkSuccessor
The main logic of this method is:
- If node’s waitStatus! =CANCELLED, change node waitStatus to 0. This is a process to clean up the state of the head node, by which I mean the initial state.
- If the node’s next node is CANCELLED, the next node is removed from the queue. Retry this process until a non-cancelled node is found or there are no nodes in the queue.
- If such a node was found in the previous step, the thread corresponding to the node is awakened through locksupport. unpark.
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); }Copy the code
4 Condition
Condition provides the ability for a thread to acquire a lock and then suspend the thread to release resources if the Condition is not met. If the condition is met, the thread is woken up to continue processing. Waiting already can support multiple conditions, its implementation principle is as follows: each call newCondition () method, which will create a ConditionObject object, each ConditionObject object can hang a waiting queue; If you want to wait for multiple conditions, simply call newCondition multiple times to create multiple condition objects. Note: First distinguish between two concepts. In this section, the synchronous waiting queue refers to the FIFO queue in AQS. Conditional wait queues are the wait queues on ConditionObject.
4.1 Source Code Analysis
4.1.1 newCondition
Create ConditionObject ConditionObject. A ConditionObject is a normal object, nothing special, just provides await, signal, etc. This part will be discussed in detail later.
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
Copy the code
4.1.2 await
Release the lock, suspend the current thread, and wait for the other thread to signal. Reacquire the lock after being awakened. This method can be considered equivalent to wait for Object. The main logic of this method is as follows:
- Create a conditional wait Node (waitStatus= CONDITION) and join the end of the conditional wait queue.
- Release the lock on ReentrantLock. Since it will sleep later, you need to release the lock on this ReentrantLock object so that other threads can acquire the lock.
- Determines whether Node is in a synchronous wait queue and suspends the current thread if not. Because Node is newly created at await time and is not in the synchronization queue, the current thread is suspended here.
- The thread is suspended, waiting for signal.
- After the thread is woken up, it attempts to acquire the lock. The acquireQueued method already has the result, so I won’t cover it here. Since the lock was released earlier, the thread will need to try again to acquire the lock for the same number of times as it was released earlier.
- Clear the nodes that have been canceled.
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (! isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) ! = 0) break; } if (acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter ! = null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode ! = 0) reportInterruptAfterWait(interruptMode); }Copy the code
4.1.3 addConditionWaiter
The main logic of this approach is as follows:
- If the lastWaiter has already been canceled, just clean them up.
- Then create a Node and add it to the end of the conditional queue.
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t ! = null && t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }Copy the code
4.1.4 isOnSyncQueue
The main logic of this approach is as follows:
- Check the current Node status. If CONDITION is a normal, unawakened Node, return false. The current Node has not entered the synchronous wait queue.
- Check whether the next Node of Node is null. If null is not null, it indicates that the Node is in the synchronous wait queue. Return true.
- Traverse the entire Node synchronous wait queue, return true if there is one; Note: If the condition is met, there is a transition from conditional wait queues to synchronous wait queues.
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next ! = null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }Copy the code
4.1.5 signal
Simply put: Wake up the thread. This is done by traversing the wait queue, finding the first Node to wake up, inserting it at the end of the synchronous wait queue, and then waiting to wake up (the same process as when one thread releases the lock and the other threads acquire it). The main logic of this approach is as follows:
- Fetch the first Node from the conditional wait queue.
- The Node is added to the synchronous queue. The Node that was added to the queue is set to SIGNAL. Finally, if the status change fails, the corresponding thread of the Node is woken up.
public final void signal() { if (! isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first ! = null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (! transferForSignal(first) && (first = firstWaiter) ! = null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (! compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || ! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }Copy the code
4.2 Using sample pseudocode
private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); Public void conditionWait() throws InterruptedException {// conditionWait until condition.lock (); // conditionWait until condition.lock (); Try {// Are all resources ready? boolean resourceReady = false; if( ! ResourceReady){//await causes the current thread to release its lock and suspend condition.await(); } // TODO executes business logic} finally {lock.unlock(); // conditionSignal() throws InterruptedException {// conditionSignal() throws InterruptedException (); // conditionSignal(); Try {// Are all resources ready? boolean resourceReady = true; if( resourceReady ){ condition.signal(); } } finally { lock.unlock(); }}Copy the code
5 Contrast ReentrantLock with synchronized
5.1 Synchronized
Synchronized realizes thread safety through mutual exclusion of synchronization; That is, only one thread can access a synchronized modified code block or method at a time. Its features and functions are as follows:
- Synchronized is a Java keyword supported by the JVM.
- Reentrant is supported, which means that a thread can lock an object again after it has acquired the lock.
- Modifying ordinary methods, the lock is the current instance object; Modify static methods that lock the class object of the current class. Wrap code blocks, locks are objects in parentheses.
- Wait interrupts are not supported.
- Synchronized is unfair.
5.2 already
ReentrantLock is a very common method of handling critical sections by locking code prior to execution to ensure that only one thread can execute a given block of code at a time. ReentrantLock has the following features and functions:
- ReentrantLock is a Java API layer implementation with Unsafe support.
- Reentrant is supported.
- Fair and unfair locks are supported. By default, unfair locks are supported. Fair lock means that when multiple threads are waiting for the lock of the same thread, they must acquire the lock according to the time order obtained by the lock. An unfair lock means that when the lock is released, any thread waiting for the lock has the opportunity to acquire it.
- Support for wait interrupts. For example, thread A obtains the lock of object O, and thread B waits to obtain the lock of object O. If thread B fails to obtain the lock for A long time, thread B can give up obtaining the lock.
- Locks can bind to multiple conditions. A thread enters a critical section only to find that it cannot execute until a condition is met, which is used to manage threads that have acquired locks but are unable to do useful work. A ReentrantLock object can bind multiple Condition objects simultaneously.
5.3 summarize
ReentrantLock is essentially a queue for synchronization. Because each Node is associated with a thread, only the synchronization of queue nodes needs to be done well, and the synchronization of multiple threads can be completed. Compared with synchronized keyword, ReentrantLock and so on are more similar to the wait queue we implement using ZooKeeper.