ReentrantLock, we call it ReentrantLock. These rely on AbstractQueuedSynchronizer class to implement thread synchronization.
ReentrantLock Demo
/ * * *@program: source-demo
* @description: ReentrantLock implements thread-safe versions of counters *@ClassName: ReentrantLockDemo *@author: Mr.Wang
* @create: he who * * / 2022-01-27
public class ReentrantLockDemo {
static ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void incr(a){
lock.lock();
try {
count++;
} finally{ lock.unlock(); }}public static void main(String[] args) throws InterruptedException {
ReentrantLockDemo atomicExample = new ReentrantLockDemo();
Thread[] threads = new Thread[2];
for (int i = 0; i < 2; i++) {
threads[i]=new Thread(() ->{
for (int j = 0; j < 10000; j++) { atomicExample.incr(); }}); threads[i].start(); } threads[0].join();
threads[1].join(); System.out.println(atomicExample.count); }}Copy the code
ReentrantLock defines a Sync class, source code as follows:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// Abstract methods
abstract void lock(a);
// Not fair, trying to get resources
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// In exclusive mode, attempt to release the resource, return true on success, false on failure
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively(a) {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition(a) {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner(a) {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount(a) {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked(a) {
returngetState() ! =0;
}
/** * Reconstitutes the instance from a stream (that is, deserializes it). */
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state}}Copy the code
Sync class inherited AbstractQueuedSynchronizer, referred to as “AQS.
There are two types of locks available in AQS:
-
An exclusive lock that allows only one thread to acquire the lock at a time
/** * Exclusive mode, attempts to obtain resources, return true on success, false * on failure@param arg * @return* / @Override protected boolean tryAcquire(int arg) { return super.tryAcquire(arg); } /** * Exclusive, attempts to release resources, return true on success, false * on failure@param arg * @return* / @Override protected boolean tryRelease(int arg) { return super.tryRelease(arg); } Copy the code
- A shared lock allows multiple threads to acquire the lock at the same time
/** * In shared mode, attempts to release the resource, returns true if it is allowed to wake up subsequent waiting nodes, otherwise returns false *@param arg
* @return* /
@Override
protected boolean tryReleaseShared(int arg) {
return super.tryReleaseShared(arg);
}
/** * Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive value indicates success and available resources. *@param arg
* @return* /
@Override
protected int tryAcquireShared(int arg) {
return super.tryAcquireShared(arg);
}
Copy the code
The core variables
/** ** private volatile int state;Copy the code
- State > 0: indicates that a thread has occupied the resource but has not released it. In the case of reentrant, the value of state may be greater than 1
- State = 0: The current lock resource is idle
// Ensure atomicity of state in multithreaded contention
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code
Already the source code
- When you call the reentrantLock. lock() method, you actually call the abstract static inner class sync.lock() method.
public void lock(a) {
sync.lock();
}
Copy the code
Syanc has two specific implementations:
Fair lock, must follow the RULES of FIFO to access the lock resources
-
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock(a) { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; }}else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}Copy the code
Unfair lock: You can directly attempt to obtain lock resources without following THE FIFO rules. By default, the unfair lock is used
-
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock(a) { // Whether the current thread is queued or not, directly preempt the lock resource through CAS. Acquire (1); // Acquire (1); if (compareAndSetState(0.1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { returnnonfairTryAcquire(acquires); }}Copy the code
Acquire (int I
/** Try to acquire an exclusive lock through tryAcquire(), return true if successful, false otherwise. If tryAcquire() returns false, the current lock is occupied and the current thread can only be wrapped as a Node by the addWaiter() method and added to the AQS synchronization queue. AcquireQueued () takes Node as an argument and attempts to obtain the lock */ by spinning
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
CAS Implementation Principle
Protected final Boolean compareAndSetState(int expect, int update) {// Use CAS optimistic lock to compare and replace. Then it is updated to Update. Return true if the update was successful, false otherwise. Return unsafe.compareAndSwapInt(this, stateOffset, expect, update); return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }Copy the code
State property
State is an attribute in AQS that means different things in different implementations. For an implementation of a reentrant lock, state represents the synchronization state, which has two meanings.
- When state is 0, it indicates that there is no lock.
- When state>0, it indicates that a thread has acquired the lock, that is, state=1. However, because ReentrantLock allows ReentrantLock, when the same thread acquires the lock multiple times, state will increase. For example, if the thread ReentrantLock has five times, state=5. When the lock is released, it also needs to be released 5 times, until state=0 before any other thread is eligible to acquire the lock.
NonfairTryAcquire () method source code
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();// Get the current thread
int c = getState();// Get the state value
if (c == 0) {// 0 indicates no lock
if (compareAndSetState(0, acquires)) {//CAS compares and replaces the value of state
setExclusiveOwnerThread(current);// Save the current thread lock, the next access to this resource does not need to compete for the lock again
return true; }}else if (current == getExclusiveOwnerThread()) {// If it is the same thread
// Directly increase the number of reentrants
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Copy the code
The implementation logic of the nonfairTryAcquire() method is as follows. Determine the status of the current lock. C ==0 indicates that there is no lock. In this case, modify state to preempt lock resources through compareAndSetState(). ○ If the preemption succeeds, true is returned. ○ If preemption fails, false is returned. Current == getExclusiveOwnerThread(), which indicates that the thread preempted to the lock is the same thread as the current thread. Therefore, the reentrant count is directly increased and stored in the state field
AbstractQueuedSynchronizer.addWaiter(Node mode)
When tryAcquire() fails to acquire the lock, the addWaiter() method is first called to encapsulate the current thread as a Node and join the queue. The source code is as follows
private Node addWaiter(Node mode) {// The mode parameter indicates the current Node state, and the node. EXCLUSIVE parameter is passed to indicate the EXCLUSIVE state.
Node node = new Node(Thread.currentThread(), mode);// Encapsulate the thread that failed to acquire the lock as Node
Node pred = tail;//tail Indicates the pair of queues in AQS. The default value is null
if(pred ! =null) {// If tail is not null, there is a node in the queue
node.prev = pred;// Point the Node prev of the current thread to tail
if (compareAndSetTail(pred, node)) {// Add node to the AQS queue via CAS, that is, set to tail
pred.next = node;// Point the next of the tail node to the current node
return node;
}
}
enq(node);// Add node to the synchronization queue when tail=null
return node;
}
Copy the code
Encapsulate the current thread into a Node and store it. You can then retrieve the thread directly from the Node and wake it up using the unpark(Thread) method. By Mr Pred! =null Determines whether the current linked list has been initialized. If the initialization has been completed, compareAndSetTail sets the Node of the current thread as the tail Node and establishes the bidirectional association. If the list has not been initialized or the CAS addition fails (with thread contention), the enq() method is called to complete the addition.
Enq () method
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // If null, CAS initialization is called. Until successful initialization
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
The method uses a spin lock to initialize the synchronization queue and add the current node to the synchronization queue. The overall structure of AQS is shown as follows:
)
ReentrantLock release lock source code analysis
public void unlock(a) {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {// Release successful
Node h = head;// Get the head node in AQS
if(h ! =null&& h.waitStatus ! =0)
// If the head is not null and the state is not equal to 0, the unparksucceeded (h) method is called to wake up the succeeding nodes
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
TryRelease (int Releases) releases the lock by modifying the state value
protected final boolean tryRelease(int releases) {
int c = getState() - releases;// Subtract the number of releases
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//
}
setState(c);
return free;
}
Copy the code
The state of an exclusive lock increases by 1 when it is added to the lock, and decreases by 1 when it is released. When the same lock can be re-entered, it may increase by 2, 3, 4, and 5. The ‘ExclusiveOwnerThread’ thread is empty only when unlock() is called the same number of times as lock() is called. The lock is released
Unparksucceeded (Node Node) Wakes the threads in the synchronization queue
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;// Get the status of the head node
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// Set the node status to 0
Node s = node.next;// Get the next node of the head node
if (s == null || s.waitStatus > 0) {
// Cancelled if the next node is null or status>0
// Find the node whose waitStatus<=0 is closest to head by scanning from the tail node
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)// If the next node is not empty, the thread is awakened directly
LockSupport.unpark(s.thread);
}
Copy the code
The unparksucceeded () method has two main logics.
-
Check the status of the current node. If the node status is invalid, scan the tail node to find the node nearest the head and whose status is SIGNAL.
-
Wake up the node with the locksupport.unpark () method.
Why scan from tail forward? This is related to the enq() method, where the logic to add a new node to the list is as follows. Point the prev of the new node to tail. Setting tail to the new node through CAS ensures thread-safety because CAS is an atomic operation. T. next=node to set tail’s next node to point to the new node. If, after CAS and before t. ext=node, another thread calls unlock() and traverses from head, the list is not completely established because t.ext =node has not been executed, and the traversal to t node will be interrupted. If you traverse from tail forward, this problem will definitely not occur.
The thread that releases the lock continues to execute
Back to the acquireQueued() method in AQS, threads that were not preempted to the lock are blocked in this method, and when the blocked thread is woken up, execution continues from the blocked position, as shown below.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();// Returns the last node
if (p == head && tryAcquire(arg)) {// Preempt the lock resource again
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// Wake up to enter the next loop
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
\