The introduction
Before synchronized was optimized, the most common synchronization tool class we used in coding was the ReentrantLock class, which has the performance of the optimized synchronized keyword and provides more flexibility. Compared with synchronized, it has more powerful functions, such as wait interruptable, fair lock and binding multiple conditions, which synchronized does not have. It is a key concurrency class that we must focus on in the development process.
ReentrantLock is important in JDK concurrency not only because of its frequency of use, but also because it provides underlying support for a large number of concurrent classes in JDK concurrency. These include CopyOnWriteArrayLit, CyclicBarrier and LinkedBlockingDeque. Since ReentrantLock is so important, it is important to understand its underlying implementation principles to use ReentrantLock flexibly in different scenarios and to find various concurrency problems. This article will walk you through the underlying implementation logic of ReentrantLock step by step, and learn how to better use ReentrantLock after the implementation logic.
Already AbstractQueuedSynchronizer relationships
When using the ReentrantLock class, the first step is to instantiate it, using new ReentrantLock().
public ReentrantLock(a) {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code
As you can see in the code, ReentrantLock provides two instantiation methods. The unparameterized method initializes the sync field with NonfairSync() by default, and the parameterized method initializes the sync field with NonfairSync() or FairSync() in the parameters area.
Fair lock needs to obtain the lock through queueing FIFO. Non-fair lock means that you can jump the queue. By default, ReentrantLock uses the implementation of non-fair lock. What is the implementation logic of the sync field? Take a look at the sync code:
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {... }static final class NonfairSync extends Sync {... }static final class FairSync extends Sync {... }Copy the code
Discovered here AbstractQueuedSynchronizer class, fair lock and a fair lock are implemented on the basis of AbstractQueuedSynchronizer, namely AQS. AQS provides the basis for the ReentrantLock implementation.
The lock() method of ReentrantLock
After analyzing the instantiation of ReentrantLock, let’s see how it implements locking:
//ReentrantLock lock method
public void lock(a) {
sync.lock();
}
// Call the lock abstract method in Sync
abstract static class Sync extends AbstractQueuedSynchronizer {.../**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock(a); . }Copy the code
Sync’s lock() method is called. The sync class’s Lock () method is an abstract method, and NonfairSync() and FairSync() implement the lock() method, respectively.
// The implementation of an unfair lock
static final class NonfairSync extends Sync {.../** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
final void lock(a) {
if (compareAndSetState(0.1)) // If the lock is 0, the lock is free
setExclusiveOwnerThread(Thread.currentThread()); // Set the current thread to hold the lock thread
else
acquire(1); }... }// Fair lock implementation
static final class FairSync extends Sync {...final void lock(a) {
acquire(1); }... }Copy the code
NonfairSync() performs a CAS operation that sets a state from 0 to 1. The CAS operation is atomic by default, ensuring thread-safe Settings. This is the first difference between unfair locking and fair locking.
So what does this state do? What does it mean to go from zero to one? Let’s look at the source code for state:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/** * The synchronization state. */
private volatile int state;
protected final int getState(a) {
return state;
}
protected final void setState(int newState) {
state = newState;
}
Copy the code
First, the state variable is a volatile variable of int type, which ensures that the variable is visible in a multithreaded environment. You can see from The variable’s annotation “The synchronization state” that state represents a synchronization state. Returning to the lock() method above, after the setting is successful, the setExclusiveOwnerThread method is called to set the current thread to a private variable that represents the current thread that acquired the lock, In the parent class of AQS AbstractOwnableSynchronizer class implementation.
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {.../** * The current owner of exclusive mode synchronization. */
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread(a) {
returnexclusiveOwnerThread; }}Copy the code
If state is set successfully, the lock() method completes, indicating that the lock was acquired. You can see that the state is a synchronization state used to determine whether the lock has been acquired. 0 indicates that the lock is idle, and 1 indicates that the lock has been acquired. What if setting state fails? Acquire (1) is then called, and fair locks call acquire(1) directly without CAS operations. Acquire (1) ¶ Acquire (1) is a method implemented in AQS.
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
This method is very important and simple to understand. There are a few steps: tryAcquire is called to attempt to acquire the lock, if it succeeds, the execution is complete, if it fails, the addWaiter method is called to add the current thread to the queue, and the acquireQueued method is executed to suspend the thread. If an interrupt is required during a pending wait, selfInterrupt is executed to interrupt the thread. Let’s look at the details of the process execution, starting with the tryAcquire method:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//NonfairSync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { / / lock idle
if (compareAndSetState(0, acquires)) { // Perform the CAS operation again to obtain the lock
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) { // The current thread acquires the lock repeatedly, i.e. lock reentrant
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if(! hasQueuedPredecessors() &&// Check whether there is a waiting thread in the queue. If there is a waiting thread, the lock fails to be obtained and the queue needs to be queued
compareAndSetState(0, acquires)) { // There is no waiting thread
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) { // The current thread acquires the lock repeatedly, i.e. lock reentrant
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//AQS to determine whether there are waiting threads in the queue
public final boolean hasQueuedPredecessors(a) {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
returnh ! = t && ((s = h.next) ==null|| s.thread ! = Thread.currentThread()); }Copy the code
AQS does not provide a specific implementation, but ReentrantLock has its own implementation for fair and unfair locks. Unfair lock The CAS operation attempts to obtain the lock when the lock is idle to ensure thread safety. If the current lock is not idle, that is, the state state is not 0, then determine whether it is a ReentrantLock, that is, the same thread has acquired the lock many times, then the state state is +1, which is also the logic that ReentrantLock supports lock reentrant.
The second difference between a fair lock and an unfair lock is that a fair lock first calls the Hasqueued24 method to determine whether a waiting thread exists on the lock wait queue when the lock is idle. If it does, it does not attempt to acquire the lock and goes through the following queuing process. So far the difference between unfair lock and fair lock should be clear. If the interview asks the difference between fair lock and unfair lock, I believe you can easily answer it.
AcquireQueued (addWaiter) is called after tryAcquire fails.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // Initialize a Node in EXCLUSIVE mode, indicating that it is an EXCLUSIVE Node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if(pred ! =null) { // If the tail node is not empty, there are thread nodes waiting in the wait queue
node.prev = pred; // Point the front node of the current node to the tail node
if (compareAndSetTail(pred, node)) { //cas sets the tail node to the current node and adds the current thread to the end of the queue to avoid data loss caused by multi-threaded Settings
pred.next = node;
return node;
}
}
enq(node); // If there are no waiting threads in the queue, or the tail node is not set successfully, then the tail node is set loop
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // Empty queue, initialize the head and tail to an empty node
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // Repeat the addWaiter setting tail, which is also the classic CAS operation -- spin, to avoid thread suspension caused by Synchronized keyword
t.next = node;
returnt; }}}}static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); // Share mode
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; // Exclusive mode. }Copy the code
The addWaiter method first initializes a Node in EXCLUSIVE mode. You should be familiar with the Node Node. I’ve written a series of collections articles about how many chained structures are implemented in this way. AQS Node is no exception, his queue structure is also through the implementation of a Node internal class to achieve, here is a two-way queue. A Node has two modes: SHARED and EXCLUSIVE. A ReentrantLock uses the EXCLUSIVE lock mode and can be initialized using the EXCLUSIVE lock mode. The shared lock pattern will be covered in more detail later in the article.
After initializing the Node, the Node is added to the queue. It should be noted that in multi-threaded environments, if the CAS fails to set the tail Node, the CAS operation needs to be spun to set the tail Node. This ensures both thread safety and success. Of course you can do this with the synchronized keyword, but this becomes less efficient and is a pessimistic locking mode.
The process of setting nodes is described in the following pictures to give you a better understanding:
After the current thread has been queued, you need to call acquireQueued to suspend the current thread:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // Get the front node of the current node
if (p == head && tryAcquire(arg)) { // If the front node is the head node, the current node is the first thread node to suspend, and cas tries again to acquire the lock
setHead(node); Set the current node as the head node, and the current node has the lock
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // No head node or failed to acquire lock. Check node status to see if thread needs to be suspended
parkAndCheckInterrupt()) // Suspend the thread, the current thread is blocked here!
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
It can be seen that this method is a spin process. First, the front node of the current node is acquired. If the front node is the head node, another attempt is made to acquire the lock. Whether can be blocked by shouldParkAfterFailedAcquire judging method, blocking by parkAndCheckInterrupt method to execute.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // indicates that the successor node needs to be suspended
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
if (ws > 0) { // Indicates that the front node exits (timeout or interruption)
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // The front node exits and loops to the nearest node that does not exit
pred.next = node;
} else { // If the state is not suspended or quit, try setting it to Node.signal
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);// Suspend the current thread
return Thread.interrupted();
}
Copy the code
Threads can be suspended only if the Node is in SIGNAL state. The Node waitStatus has four states:
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */
static final int PROPAGATE = -3;
Copy the code
The comments are very clear, so the four states are not explained in detail here. The difference between a fair Lock and an unfair Lock can be easily found in the process of locking. An unfair Lock must be queued, but the CAS will try to obtain the Lock directly before queuing. Having said acquiring the lock, let’s look at the process of releasing the lock.
ReentrantLock’s unLock() method
The unLock() method is easier to understand because it does not need to worry about multithreading. If the unLock() thread is not the one that was locked before, it simply exits. Look at the source code for unLock() :
public class ReentrantLock implements Lock.java.io.Serializable {...public void unlock(a) {
sync.release(1); }... }public abstract class AbstractQueuedSynchronizer {...public final boolean release(int arg) {
if (tryRelease(arg)) { // Try to release the lock
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h); // Start subsequent threads after successful lock release
return true;
}
return false; }... }abstract static class Sync extends AbstractQueuedSynchronizer {...protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if(Thread.currentThread() ! = getExclusiveOwnerThread())// The lock must be released by the thread that acquired the lock, otherwise exit, ensuring that this method can only be accessed by a single thread
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { If the exclusive lock is 0, the lock will be released. Otherwise, the lock will not be released
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
returnfree; }... }abstract static class Sync extends AbstractQueuedSynchronizer {...private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread); // Suspend the current thread}... }Copy the code
As with lock(), the AQS release method is called, and tryRelease is first called to try to release the lock. TryRelease must be the thread that currently acquired the lock, and then determine whether to re-enter the lock. If the lock is not re-entered, the current thread’s lock is released. Call the unparksucceeded method after the lock is released to start the successor threads.
conclusion
ReentrantLock acquires and releases locks through AQS state control, AQS internal use a bidirectional linked list to maintain suspended threads. AQS uses to manage various states, and internally manages thread queues through linked lists. ReentrantLock provides external lock acquisition and release functions. The specific implementation is in AQS. Below I summarize the process of fair locking and unfair locking through two flow charts.
Unfair lock: