A:
ReentrantLock is a locking mechanism provided in the java.util.Concurrent package. ReentrantLock is a reentrant, mutually exclusive lock that supports both fair and unfair implementations. Based on java8, this article analyzes the implementation principle of concurrency tool ReentrantLock.
ReentrantLock class diagram
Three: process diagram
Four: source code analysis
We take lock() method and unlock() method as the entrance to ReentrantLock source analysis.
Lock ()
ReentrantLock creates different Sync objects based on the fair argument passed by the constructor when the constructor creates an object (default is an implementation of an unfair lock). Reentrantlock. lock() calls sync.lock(). The Lock () method in the Sync class is an abstract method implemented in FairSync(fair) and NonfairSync(unfair), respectively.
public ReentrantLock(a) {
// Default is unfair lock
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code
public void lock(a) {
sync.lock();
}
Copy the code
Unfair realization:
final void lock(a) {
// state indicates the number of lock reentries. 0 indicates that there is no lock
// Try to preempt the lock once. Replace the state flag with CAS to indicate that the lock is successfully preempt
if (compareAndSetState(0.1))
// Preemption success sets the thread that preempted the lock to the current thread
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
Copy the code
Fair lock implementation:
final void lock(a) {
acquire(1);
}
Copy the code
It can be seen that an unfair lock will attempt to preempt the lock once at the start of the lock() method, i.e. cut the queue once at this point. The acquire() method is then called. The acquire() method calls three methods, tryAcquire(), addWaiter(), and acquireQueued(), which are the core methods of ReentrantLock. We will focus on these three methods next.
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))AcquireQueued () returns a thread interrupt flag step by step. If true, the thread has been interrupted and an interrupt flag is set to the client
// Since locksupport.park () does not respond to interrupts after that, the interrupt marker needs to be passed back step by step
selfInterrupt();
}
Copy the code
TryAcquire () method
Flow chart:
The tryAcquire() method is a lock preemption method. If the tryAcquire() method is a lock preemption method, return ture to indicate that the thread preemption the lock, if the lock is preemption, do nothing, directly execute the synchronization code, if the thread is not preemption, store the thread information, and block the thread. That is, call the addWaiter() and acquireQueued() methods.
TryAcquire () also has both fair and unfair lock implementations.
Source code analysis:
Fair lock implementation:
protected final boolean tryAcquire(int acquires) {
// Get the current thread
final Thread current = Thread.currentThread();
// Get the value of state
int c = getState();
if (c == 0) {
// If state is 0, the lock can be preempted
// Only if there is no element in the AQS list, try to preempt the lock. Otherwise, queue the list
if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
// If the cas is replaced successfully, the lock is preempted successfully
setExclusiveOwnerThread(current);
return true; }}// Determine if the thread that acquired the lock is the current thread. If so, increase the reentrant count (i.e. increase the value of state).
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; }}Copy the code
Unfair lock implementation:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
Copy the code
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// If state is 0, the lock is now unlocked and not fair
// If there is a thread in the AQS list that is already waiting for the lock, it will not attempt to preempt the lock.
if (compareAndSetState(0, acquires)) {
// If the cas is replaced successfully, the lock is preempted successfully
setExclusiveOwnerThread(current);
return true; }}// Determine if the thread that acquired the lock is the current thread. If so, increase the reentrant count (i.e. increase the value of state).
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// Reset the value of state
setState(nextc);
return true;
}
return false;
}
Copy the code
Next, analyze the addWaiter() method
addWaiter()
AddWaiter role () method is the threads will not have access to lock in a Node object, and then stored in the queue AbstractQueuedSynchronizer synchronizer (to lazy AQS). Let’s first look at the structure of the Node object.
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared(a) {
return nextWaiter == SHARED;
}
final Node predecessor(a) throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread; }}Copy the code
According to the structure of Node object, we can see that it is a bidirectional linked list structure, which stores prev and next references, thread member variables are used to store blocked thread references, and Node is stateful, which are CANCELLED, SIGNAL, CONDITION, PROPAGATE, respectively. The states involved in ReentrantLock are “SIGNAL” and “CANCELLED”. AQS also stores the head Node and tail Node of the linked list. Therefore, in fact, AQS stores the data structure of blocked threads as a Node bidirectional linked list. The addWaiter() method encapsulates the blocking thread as a Node and stores it in the linked list of AQS.
Flow chart:
Source code analysis:
private Node addWaiter(Node mode) {
// Encapsulate the thread without the lock into a node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// If the AQS tail is not null, the AQS list has been initialized to try to add the constructed node to the end of the list
if(pred ! =null) {
node.prev = pred;
//cas replaces the tail of AQS
if (compareAndSetTail(pred, node)) {
pred.next = node;
returnnode; }}// There is no initial call to enq()
enq(node);
return node;
}
Copy the code
private Node enq(final Node node) {
/ / spin
for (;;) {
Node t = tail;
// If the end node is empty, the AQS list has not been initialized yet
if (t == null) { // Must initialize
//cas initializes the head of AQS
// Note that the head node does not store thread information, which means that the head node is a virtual node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// If the end node is not empty, add it to the end of the list
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
Next analyze acquireQueued()
acquireQueued()
The acquireQueued() method blocks the threads stored in the AQS linked list using the locksupport.park () method.
Flow chart:
Source code analysis:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// Enter spin
for (;;) {
// Get the previous node of the current node
final Node p = node.predecessor();
// If the previous node is head and a second attempt to acquire the lock succeeds, remove the node from the AQS queue and replace the head with an interrupt flag
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// Note that the for loop is broken only when the lock is preempted
return interrupted;
}
// Remove nodes CANCELLED and block threads threads are blocked here
// Notice that the thread is woken up and continues to execute the for loop to try to preempt the lock
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if (failed)
// Change the node status to CANCELLED if the node failscancelAcquire(node); }}Copy the code
In acquireQueued () method has two methods more important shouldParkAfterFailedAcquire () method and parkAndCheckInterrupt () method.
shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// If the node is in SIGNAL state, no processing is required
return true;
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
// If the state of the node is >0, the node is canceled. The node in this state needs to be cleared. Use the do while loop to clear the previous continuous node in the canceled state
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
// In normal cases, cas is used to replace the state of the previous node with SIGNAL state -1
// Note that all the nodes in the queue are -1 except for the last one, including the head node
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Copy the code
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt(a) {
Locksupport-park (thread) calls the unsafe.park () method to block the thread (a native method).
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
So that’s the end of the lock() method and now let’s look at the unlock() method
Unlock () method
Flow chart:
The unlock() method of reentrantLock calls sync’s release() method.
public void unlock(a) {
// Each call to unlock decreases state by one
sync.release(1);
}
Copy the code
The release() method has two important methods, the tryRelease() method and the unparkprecursor (). The tryRelease() method computs the value of state to see if the thread has succeeded in releasing the lock completely (this is because ReentrantLock is reentrant), Call the unparksucceeded () method to wake up the threads if the locks have been completely released, otherwise there is no need to wake up the threads.
public final boolean release(int arg) {
// Only tryRelease returns true to indicate that the lock has been released and the blocking thread needs to be awakened otherwise no other thread needs to be awakened
if (tryRelease(arg)) {
Node h = head;
// If the header is not empty and the state is not 0, the synchronization queue has been initialized and there is a node to wake up
// Note that the head of the synchronization queue is a virtual node, which is clear from the node-building code
/ / and in shouldParkAfterFailedAcquire approach to turn head node status change to 1
// Return true if head has a state of 0, which means there are no elements in the queue that need to be awakened
if(h ! =null&& h.waitStatus ! =0)
// Wake up the next node of the header
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
tryRelease()
protected final boolean tryRelease(int releases) {
// Reduce reentrant times
int c = getState() - releases;
Throw an exception if the thread that acquired the lock is not the current thread
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
// If state is 0, the lock has been completely released
if (c == 0) {
free = true;
// Set the thread that acquires the lock to null
setExclusiveOwnerThread(null);
}
// Reset the value of state
setState(c);
// Return true if the lock is released and false otherwise
return free;
}
Copy the code
unparkSuccessor()
private void unparkSuccessor(Node node) {
Set the head state to 0 to indicate that threads are being awakened. If the head state is 0, it will not enter this method
int ws = node.waitStatus;
if (ws < 0)
// Set the state of the header to 0
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
// The next state of the cancelled header is not a cancelled node (since headers do not store blocked threads)
Node s = node.next;
// The current node is null or cancelled
if (s == null || s.waitStatus > 0) {
s = null;
// Start aQS at the end of the list to find the nearest non-empty state cancelled node and assign s to the node. Traversal from the tail does not result in prve being unassigned
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
// Call locksupport.unpark () to wake up the specified thread
LockSupport.unpark(s.thread);
}
Copy the code
Five:
In conclusion, the following points need to be paid attention to:
1.ReentrantLock has two implementations: fair lock and unfair lock. In fact, there are only two differences between the two implementations: the first is that at the beginning of the lock() method, the non-fair lock will try cas to preemption the lock and insert the queue. The unfair lock will preempt the lock once, while the fair lock will judge whether there are waiting threads in the AQS linked list, and the threads without waiting will preempt the lock.
2. The data structure of AQS storing blocked threads is a bidirectional linked list structure, and it follows first-in-first-out, because it wakes up from the next node of the beginning node, and new nodes are added to the tail of the linked list when added, so AQS is also a queue data structure.
3. The thread will continue to execute the acquireQueued() method because it is blocking in the for loop of the acquireQueued() method and will attempt to acquire the lock if it succeeds in removing the node from the AQS and breaking out of the for loop, otherwise it will continue to block. The lock failed because someone cut in line. Unfair lock).