“This is the 29th day of my participation in the First Challenge 2022. For details: First Challenge 2022”
ReentrantLock (AQS) ReentrantLock (AQS) ReentrantLock (AQS) ReentrantLock
You can learn about ReentrantLock at juejin.cn/post/706288…
AbstractQueuedSynchronizer is blocking type lock, as well as the implementation framework of synchronizer components. Is the core of concurrent programming in JDK, it provides a FIFO based queue, usually we often use in the work of ReentrantLock, CountDownLatch and so on are based on it to achieve.
I. First acquaintance with AQS
Let’s start with ReentrantLock and see what its code structure looks like:
Here’s what you can see from the image above:
- ReentrantLock Implementation interface Lock (Abstract interface)
- ReentrantLock has three internal classes, FrairSync, NonfairSync, and Sync, and FrairSync and NonfairSync inherit from Sync.
- Sync inheritance AbstractQueuedSynchronizer
- AbstractQueuedSynchronizer has two inner classes, is a Node, ConditionObject respectively.
- AbstractQueuedSynchronizer inherited from AbstractOwnableSynchronizer (abstract classes, offer exclusive threads get and set).
AQS has the following characteristics:
-
The state attribute is used to represent the resource status, including the exclusive state and the shared state, corresponding to the fair lock and the unfair lock. Subclasses need to define how to maintain this state and control how to acquire and release locks. As shown in the diagram above, fair locks and unfair locks need to maintain this state separately to achieve the purpose of acquiring and releasing locks.
- GetState – Gets the state status
- SetState – Sets the state state
- CompareAndSetState – Sets the state using CAS
- Exclusive mode: Only one thread can access a resource
- Shared mode: Allows multiple threads to access resources
-
A FIFO based wait queue is provided, similar to Monitor’s EntryList mentioned earlier in the Synchronized principle
-
Condition variables are used to implement the wait queue and thread wake up mechanism, and multiple condition variables are supported simultaneously, similar to the WaitSet of Monitor mentioned in the previous section of Synchronized principle
-
[Fair lock] and [unfair lock] : the difference between the two mainly lies in whether the lock acquisition is related to the queuing order. When a lock is held by one thread, other threads attempting to acquire the lock are suspended and added to the queue, with the first thread to be suspended at the front of the queue. When the lock is released, the thread in the queue needs to be notified. As a fair lock, the thread at the front of the queue is awakened first; Instead of a fair lock, all threads wake up and compete to acquire the lock, and subsequent threads may acquire the lock.
Second, source code analysis
Let’s take a look at the nature of the source code.
First of all, I have an impression in mind that AQS maintains two pairs of queues, one is synchronous queue, one is blocking queue.
Node can be described as a synchronization queue and a blocking queue Node.
2.1 Node source code analysis
static final class Node {
// The mode can be shared or exclusive
// Share mode
static final Node SHARED = new Node();
// Exclusive mode
static final Node EXCLUSIVE = null;
// Node state
// CANCELLED, with a value of 1, indicating that the current thread was CANCELLED
// SIGNAL, with a value of -1, indicates that the current node's successors contain threads that need to run, i.e., unpark
// CONDITION, with a value of -2, indicates that the current node is waiting on CONDITION, i.e. in the CONDITION queue
PROPAGATE, the value is -3, which means that the subsequent acquireShared in the current scene can be executed
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// Node state
volatile int waitStatus;
// The precursor node
volatile Node prev;
// The next node
volatile Node next;
// The thread corresponding to the node
volatile Thread thread;
// The next person waiting
Node nextWaiter;
// Whether the node is waiting in shared mode
final boolean isShared(a) {
return nextWaiter == SHARED;
}
// Get the precursor. If the precursor is empty, throw an exception
final Node predecessor(a) throws NullPointerException {
// Save the precursor
Node p = prev;
if (p == null) // If the driver is empty, an exception is thrown
throw new NullPointerException();
else // The precursor node is not empty, return
return p;
}
// No argument constructor
Node() { // Used to establish initial head or SHARED marker
}
// Constructor, used by addWaiter
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// constructor
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread; }}Copy the code
2.2 ConditionObject source analysis
Condition interface is implemented. Condition learning will be introduced later, and its use is also introduced when learning ReentrantLock.
More code, just look at it from the top down:
/ / inner classes
public class ConditionObject implements Condition.java.io.Serializable {
/ / version number
private static final long serialVersionUID = 1173984872572414699L;
Condition specifies the head of the condition queue
private transient Node firstWaiter;
// condition specifies the end of the queue
private transient Node lastWaiter;
/** * constructor */
public ConditionObject(a) {}/** * add a new waiter to the wait queue */
private Node addConditionWaiter(a) {
// define the terminator to be t
Node t = lastWaiter;
/ / end node is not null, and end the state of the node for CONDITION (default is 2, said the current node in conditionObject waiting queue)
if(t ! =null&& t.waitStatus ! = Node.CONDITION) {// Clear nodes that are not in CONDITION and reassign firstWaiter and lastWaiter
unlinkCancelledWaiters();
// reassign the last node to t
t = lastWaiter;
}
// Create a new node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// The tail is null
if (t == null)
// Sets the head of the condition queue
firstWaiter = node;
else
// Set the nextWaiter field of the node to node
t.nextWaiter = node;
// Update the tail of the condition queue
lastWaiter = node;
return node;
}
/** * remove or transfer headers to sync queue until there are no cancelled or empty */
private void doSignal(Node first) {
/ / loop
do {
// Set the next node to the first node, if null
if ( (firstWaiter = first.nextWaiter) == null)
// Set the tail to null
lastWaiter = null;
// Set the nextWaiter field for first
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null); // Failed to transfer the node from the condition queue to sync queue and the head node in the condition queue is not empty
}
/** * Transfer all waiting nodes to synchronization queue */
private void doSignalAll(Node first) {
// condition sets the head and tail of the queue to null
lastWaiter = firstWaiter = null;
/ / loop
do {
// Get the nextWaiter domain of first
Node next = first.nextWaiter;
// Set first's nextWaiter field to null
first.nextWaiter = null;
// Move the first node from the condition queue to sync queue
transferForSignal(first);
// Reset first
first = next;
} while(first ! =null);
}
/** * Reassign firstWaiter and lastWaiter to all non-condition nodes **/
private void unlinkCancelledWaiters(a) {
// Get the condition queue header
Node t = firstWaiter;
// Get a null tail
Node trail = null;
while(t ! =null) {
// Get the next node
Node next = t.nextWaiter;
// The state of the header is not CONDTION
if(t.waitStatus ! = Node.CONDITION) {// Set the next wait for t node to null
t.nextWaiter = null;
if (trail == null) // trail is empty
// Reset the head of the condition queue
firstWaiter = next;
else
// Set trail's nextWaiter field to next
trail.nextWaiter = next;
if (next == null) // Next is empty
// Sets the end of the condition queue
lastWaiter = trail;
}
else // the state of t node is CONDTION
// Set the trail node
trail = t;
// set t nodet = next; }}/** * Implements the Condition signal method */
public final void signal(a) {
if(! isHeldExclusively())// Not exclusive by the current thread, throws an exception
throw new IllegalMonitorStateException();
// Save the condition queue header
Node first = firstWaiter;
if(first ! =null) // The header is not null
// Wake up a waiting thread, remove the head node from the blocking queue and add it to the synchronization queue
doSignal(first);
}
/** * Implements the Condition signalAll method to wake up all threads */
public final void signalAll(a) {
if(! isHeldExclusively())// Not exclusive by the current thread, throws an exception
throw new IllegalMonitorStateException();
// Save the condition queue header
Node first = firstWaiter;
if(first ! =null) // The header is not null
// Wake up all waiting threads, remove the head node from the blocking queue and add it to the synchronization queue
doSignalAll(first);
}
The difference between /** * and await() is that with await methods, interrupt() is called and an error is reported, whereas this method does not. * /
public final void awaitUninterruptibly(a) {
// Add a node to the queue
Node node = addConditionWaiter();
// Get the freed state
int savedState = fullyRelease(node);
boolean interrupted = false;
while(! isOnSyncQueue(node)) {//
// Block the current thread
LockSupport.park(this);
if (Thread.interrupted()) // The current thread is interrupted
// Set the status to interrupted
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}
/** * wait, the current thread is waiting until it receives a signal or is interrupted */
public final void await(a) throws InterruptedException {
// The current thread is interrupted, raising an exception
if (Thread.interrupted())
throw new InterruptedException();
// Wrap the current thread as Node and insert the tail into the wait queue
Node node = addConditionWaiter();
// Release the lock held by the current thread, which wakes up the next node in the synchronization queue
int savedState = fullyRelease(node);
int interruptMode = 0;
while(! isOnSyncQueue(node)) {// The current thread enters the wait state
LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0) // Check the interrupt type while the node is waiting
break;
}
// Spin wait to get synchronization state (i.e. get lock)
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
// Handle interrupts
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
/** * wait, the current thread is waiting until it receives a signal, is interrupted, or reaches the specified wait time */
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while(! isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/** * wait, the current thread waits until it receives a signal, is interrupted, or reaches a specified deadline */
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while(! isOnSyncQueue(node)) {if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
return! timedout; }/** * Wait, the current thread waits until it receives a signal, is interrupted, or reaches the specified wait time. AwaitNanos (unit.tonanos (time)) > 0 */
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
// 1. Wrap the current thread as Node and insert the end of the thread into the wait queue
Node node = addConditionWaiter();
// 2. Release the lock held by the current thread and wake up the next node in the synchronization queue
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while(! isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null)
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
return! timedout; }Copy the code
2.3 Lock acquisition and release
The whole design concept of AQS is to achieve the acquisition and release of locks through the state field. Locks are mainly divided into fair locks and unfair locks.
2.3.1 fair lock
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock(a) {
// Inherit from AQS method, internal first call tryAcquire to obtain the lock, if the acquisition fails, add downtown to wait queue
acquire(1);
}
/** * Fair lock version of tryAcquire */
protected final boolean tryAcquire(int acquires) {
// Get the current thread
final Thread current = Thread.currentThread();
// Get the lock status
int c = getState();
// 0 indicates that the lock is not held
if (c == 0) {
// Check whether there are nodes waiting in the current wait queue
if(! hasQueuedPredecessors() &&// Compare and replace states
compareAndSetState(0, acquires)) {
// Set the current thread to an exclusive thread
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
// Lock reentrant
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; }}Copy the code
2.3.2 Unfair Lock
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/** * Obtain the lock immediately, failure will join the wait queue */
final void lock(a) {
// Try to obtain the lock through CAS, if successful, set the current thread to exclusive
if (compareAndSetState(0.1))
setExclusiveOwnerThread(Thread.currentThread());
else
// Or join the wait queue · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
acquire(1);
}
/** * Non-fair lock version of tryAcquire */
protected final boolean tryAcquire(int acquires) {
// Follow the default nonfairTryAcquire of its parent Sync class
returnnonfairTryAcquire(acquires); }}Copy the code
2.3.3 Syc subclass
This is the parent of fair and unfair locks and provides a uniform tryRelease method to release the lock
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/** * provides a quick path to the unfair version */
abstract void lock(a);
/** * Unfair lock acquisition, default is unfair lock */
final boolean nonfairTryAcquire(int acquires) {
// Get the current thread
final Thread current = Thread.currentThread();
// Get the status of the current lock
int c = getState();
// 0 indicates that it is not occupied
if (c == 0) {
// the current thread is locked as an exclusive lock
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}// The current thread has an exclusive lock, indicating lock reentrant
else if (current == getExclusiveOwnerThread()) {
// State + 1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// Set the current state
setState(nextc);
return true;
}
return false;
}
/** * release lock */
protected final boolean tryRelease(int releases) {
// Current state minus the number of releases
int c = getState() - releases;
// If the current thread is not the thread holding the lock, throw an exception
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
// Cancel the exclusive thread when the status is 0
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// Set the status to 0
setState(c);
return free;
}
protected final boolean isHeldExclusively(a) {
// Whether the current thread is a lock holder
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Get the current holder
final Thread getOwner(a) {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// Get the number of holds, which only thread holders can get
final int getHoldCount(a) {
return isHeldExclusively() ? getState() : 0; }}Copy the code
2.3.4 acquire and release
There are also two core methods in AQS:
- Acquire () : Acquire the lock, this is the real method of the actual lock call, the previous try knowledge attempts to acquire the lock, even if the failure is not added to the wait queue.
public final void acquire(int arg) {
// Try to get
if(! tryAcquire(arg) &&// Try to get success, add to the wait queue exclusively, and keep trying to hold the lock until it succeeds
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Copy the code
- Release () : Releases the lock, which is actually released by calling the tryRelease method of the lock’s custom synchronizer implementation:
/** * Attempts to release, returns true */ on success public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if(h ! =null&& h.waitStatus ! =0) unparkSuccessor(h); return true; } return false; } Copy the code
Let’s customize the non-reentrant lock and see what the relationship between synchronizer and lock is.
2.4 Summary
So far, by reading the previous source content, we can have the following summary:
- Locks are released and acquired around [state], where 0 means no lock is held, 1 means exclusive, and greater than 1 means lock reentrant
- The lock can be obtained as follows:
// If the lock fails to be obtained if(! tryAcquire(arg)) {// Join the queue, you can choose to block the current thread park unpark } Copy the code
- Release the lock as follows:
// If the lock is obtained successfully if(! tryRelease(arg)) {// Let the blocked thread resume running } Copy the code
Three, practice
Now that we know the structure of AQS, we might as well try it out for ourselves. Deepen understanding.
To define a non-reentrant lock, you need a synchronizer, a lock, and a test class
Custom synchronizer:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/ * * *@description: Implements a non-reentrant lock synchronizer with a maximum state of 1 *@author: weirx *@date: 2022/1/13 13:49 *@version: 3.0 * /
public class MyLockSynchronizer extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
int state = getState();
if (state == 0) {
if (compareAndSetState(0.1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true; }}return false;
}
@Override
protected boolean tryRelease(int acquires) {
int c = getState() - acquires;
if (c == 0) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
@Override
protected boolean isHeldExclusively(a) {
return getState() == 1;
}
protected ConditionObject newCondition(a) {
return newConditionObject(); }}Copy the code
Custom locks:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/ * * *@description: Custom lock *@author: weirx *@date: 2022/1/13 14:05 *@version: 3.0 * /
public class MyLock implements Lock {
MyLockSynchronizer myLockSynchronizer = new MyLockSynchronizer();
@Override
public void lock(a) {
// Try to obtain the lock, fail to join the wait queue
myLockSynchronizer.acquire(1);
}
@Override
public void lockInterruptibly(a) throws InterruptedException {
// Try to obtain the lock, fail to join the queue, can interrupt
myLockSynchronizer.acquireInterruptibly(1);
}
@Override
public boolean tryLock(a) {
// Try to acquire the lock without joining the queue
return myLockSynchronizer.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// Try to get the lock without joining the queue
return myLockSynchronizer.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock(a) {
/ / releases the lock
myLockSynchronizer.release(1);
}
@Override
public Condition newCondition(a) {
// Condition variable
returnmyLockSynchronizer.newCondition(); }}Copy the code
Test lock effect:
/ * * *@descriptionTest: *@author: weirx *@date: 2022/1/13 * to them@version: 3.0 * /
public class TestMyLock {
public static void main(String[] args) {
MyLock myLock = new MyLock();
new Thread(() -> {
try {
myLock.lock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ "" + Thread.currentThread().getName() + " :acquire lock success");
// Sleep for a second to see the effect
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.unlock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ "" + Thread.currentThread().getName() + " :release lock success"); }},"t1").start();
new Thread(() -> {
try {
myLock.lock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ "" + Thread.currentThread().getName() + " :acquire lock success");
} finally {
myLock.unlock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ "" + Thread.currentThread().getName() + " :release lock success"); }},"t2").start(); }}Copy the code
As a result, it takes TI one second to release the lock before T2 acquires it:
2022-01-13 14:34:56 t1 :acquire lock success
2022-01-13 14:34:57 t2 :acquire lock success
2022-01-13 14:34:57 t1 :release lock success
2022-01-13 14:34:57 t2 :release lock success
Copy the code
Mylock. lock() is used once more in thread T1 of the test code, and the entire program is stuck.
2022-01-13 14:35:56 t1 :acquire lock success
Copy the code
4. The addition of Condition
Conditions are not covered in this article, but they were mentioned earlier in ReentrantLock. You can go back to this article to see how they are used: juejin.cn/post/706288…
Source code learning is really difficult, see others say more than their own follow again, suggest students refer to this article to track their own again.