statement
This article liver for a long time, more pictures, I hope you like.
In addition, interested partners can pay attention to personal public account: a flower is not romantic
The public account has just started operation, and I hope to grow with you.
preface
When it comes to concurrent, we have to say AQS (AbstractQueuedSynchronizer), the so-called AQS is abstract queue type synchronizer, defines many internal lock related methods, We are familiar with ReentrantLock, ReentrantReadWriteLock, CountDownLatch, Semaphore and so on are based on AQS to achieve.
Let’s start with a UML diagram related to AQS:
Mind mapping:
AQS implementation principle
AQS maintains a volatile int state (representing shared resources) and a FIFO thread wait queue (which is entered when multithreaded contention for resources is blocked).
Volatile guarantees visibility across multiple threads. State =1 indicates that the current lock is in use, and other threads will fail to lock the object. The thread that failed to lock will be placed in a FIFO wait queue, and the nonvolatile column will be suspended by the UNSAFE. Wait for another thread to release the lock before it can be awakened.
In addition, the operation of state is ensured by CAS to ensure the security of its concurrent modification.
The specific principle can be summarized in a diagram:
AQS provides many methods for implementing locks,
- GetState () : Gets the flag state value of the lock
- SetState () : Sets the flag state value of the lock
- TryAcquire (int) : Exclusive lock acquisition. Attempts to obtain the resource return true on success and false on failure.
- TryRelease (int) : Release the lock exclusively. Attempts to free resources return true on success and false on failure.
There are some methods that are not listed here. Next, we take ReentrantLock as a breakthrough point to understand the internal implementation principle of AQS step by step in the form of source code and drawing.
The directory structure
The article is prepared to simulate multi-threaded competition lock, lock release scenario to analyze AQS source code:
Three threads (thread one, thread two, thread three) lock/release the lock at the same time
The directory is as follows:
- Thread aWhen the lock is successfully added
AQS
Internal implementation - Thread two/threeFailed to lock
AQS
Data model for waiting queues in - Thread – one lock release and thread – two lock acquisition principle
- Through the thread scenario to explain the specific implementation principle of fair lock
- Condition a is explained in a threaded scenario
wait()
andsignal()
Realize the principle of
Here, the data structure and realization principle of AQS after locking and releasing each thread will be analyzed by drawing diagrams
Scenario analysis
Thread one lock succeeded
If three threads concurrently preempt the lock, thread 1 successfully preempt the lock, while thread 2 and thread 3 fail to preempt the lock. The specific execution flow is as follows:
At this time, the internal data of AQS are as follows:
Thread 2, thread 3 failed to lock:
As can be seen from the figure, Node is a bidirectional linked list. SIGNAL is the waitStatus attribute in Node, and there is also a nextWaiter attribute in Node. This attribute is not drawn in the figure, and Condition will explain this in detail later.
Specifically look at the preemption lock code implementation:
java.util.concurrent.locks.ReentrantLock .NonfairSync:
static final class NonfairSync extends Sync {
final void lock(a) {
if (compareAndSetState(0.1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
returnnonfairTryAcquire(acquires); }}Copy the code
If the preemption succeeds, the state value is changed to 1, and the object exclusive lock thread is set to the current thread. As follows:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
Copy the code
Thread two failed to preempt the lock
According to the real scenario, after thread 1 successfully preempts the lock, the state changes to 1, and thread 2 will inevitably fail to modify the state variable through CAS. At this point, the data In THE FIFO(First In First Out) queue In AQS is shown In the figure below:
Let’s break down the logic of thread two execution step by step:
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
First take a look at tryAcquire () implementation: Java. Util. Concurrent. The locks. Already. NonfairTryAcquire () :
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Copy the code
If the value is not 0, it indicates that the lock of the current object has been occupied by another thread. Then it determines whether the thread that owns the lock is the current thread. If so, it accumulates the state value, which is the specific implementation of the reentrant lock. The state value is also decremented when the lock is released.
If state is 0, perform the CAS operation and try to change the state value to 1. If the update succeeds, the current thread is locked successfully.
Thread 2, for example, will not succeed in changing the value of state through CAS because thread 1 has already changed state to 1. Failed to lock.
Thread 2 will return false after tryAcquire() and add itself to a FIFO wait queue by executing addWaiter(Node.exclusive) :
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter()
:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Copy the code
This code starts by creating a Node bound to the current thread. Node is a bidirectional list. If the tail pointer in the waiting queue is empty, enq(node) is directly called to add the current thread to the tail of the waiting queue:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
When the tail pointer is empty for the first loop, we enter the if logic and use the CAS operation to set the head pointer to a newly created Node. Data in AQS at this time:
After execution, head, tail, and t all point to the first Node element.
Do the second loop to enter the else logic. Now that we have the head Node, we need to attach the Node corresponding to thread 2 to the head Node. There are two nodes in the queue:
When the addWaiter() method completes, it returns information about the node created by the current thread. AcquireQueued (addWaiter(Node.exclusive), ARg) ¶
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued()
:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndChecknIterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt(a) {
LockSupport.park(this);
return Thread.interrupted();
}
Copy the code
AcquireQueued () this method determines whether the Node currently passed is preceded by a head and attempts to lock it if it is. If the lock is successful, the current node will be set as head node, and then the previous head node will be vacant for subsequent garbage collection.
If lock failure or a forward Node is not the head Node, the Node would pass shouldParkAfterFailedAcquire methods will head Node waitStatus into SIGNAL = 1, the final execution parkAndChecknIterrupt method, Call locksupport.park () to suspend the current thread.
At this point, the data in AQS are shown as follows:
Thread 2 then sits in the AQS queue, waiting for another thread to release the lock to wake it up.
Thread three failed to preempt the lock
The addWaiter(Node mode) method is an example of a thread failing to preempt a lock.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred ! =null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Copy the code
At this point, the tail node of the waiting queue points to thread 2. After entering the if logic, the TAIL node points to thread 3 again by CAS instruction. Thread 3 calls the enq() method to join the queue, which is the same as thread 2. After joining the queue, waitStatus=SIGNAL will be changed in thread 2’s Node. Finally thread three will also be suspended. The data of the waiting queue is shown in the figure below:
Thread one releases the lock
Now let’s analyze the process of releasing the lock. First, thread 1 releases the lock, and after releasing the lock, it will wake up the post-node of the head node, namely thread 2. The specific operation process is as follows:
Wait queue data after execution is as follows:
Thread two has been awakened and continues to try to acquire the lock, or if it fails to acquire the lock, it continues to be suspended. If the lock is successfully obtained, the data in AQS are shown as follows:
Let’s take a look at the code for thread release lock:
java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if(h ! =null&& h.waitStatus ! =0)
unparkSuccessor(h);
return true;
}
return false;
}
Copy the code
TryRelease () is executed first, which is implemented in ReentrantLock. If tryRelease succeeds, it continues to determine whether the waitStatus of the head node is 0, as we saw earlier. The waitStatue of head is the SIGNAL(-1), here the unparksucceeded () method is executed to wake up the head’s posterior nodes, the nodes corresponding to thread two in our diagram above.
Look at the implementation in reentrantLock. tryRelease() :
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if(Thread.currentThread() ! = getExclusiveOwnerThread())throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Copy the code
After reentrantLock.tryrelease (), state is set to 0 and the exclusive Lock of the Lock object is set to NULL. Now take a look at the data in AQS:
Then execute Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer. UnparkSuccessor () method, wake up the rear of the head node:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
s = t;
}
if(s ! =null)
LockSupport.unpark(s.thread);
}
Copy the code
The main thing is to set the waitStatus of the head node to 0.
At this point, re-point the head pointer to the Node corresponding to thread 2, and use the locksupport. unpark method to wake up thread 2.
Thread two, awakened, then attempts to acquire the lock, modifying the state data with the CAS instruction. After execution, you can view the data in AQS:
At this point thread two is woken up, and thread two picks up where it left off, continuing the acquireQueued() method.
Thread two wakes up and continues to lock
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
At this point, thread 2 is awakened and continues to execute the for loop to determine whether the front node of thread 2 is head. If so, the tryAcquire() method will be used to try to acquire the lock. In fact, CAS operation will be used to modify the state value. Thread 2 is then set to head, and the previous head data is empty, and the vacant data is waiting for garbage collection.
At this point, thread 2 successfully acquired the lock, and queue data in AQS are as follows:
The data in the wait queue is waiting to be garbage collected.
Thread two releases the lock/thread three adds the lock
When thread 2 releases the lock, thread 3 will wake up. The process is similar to the above. Thread 3 will try to lock again. The specific flow chart is as follows:
Queue data in AQS are shown in the figure below:
Fair lock implementation principle
All of the above locking scenarios are based on unfair locking. Unfair locking is the default implementation of ReentrantLock. Let’s look at the implementation principle of fair locking.
Unfair lock execution process:
When thread 2 releases the lock, thread 3 wakes up the suspended thread. Thread 3 executes tryAcquire() and attempts to modify state using CAS. If thread 4 also attempts to lock, the tryAcquire() method will also be executed.
In this case, there will be a race, and thread four will still need to stay on the wait queue if it succeeds in acquiring the lock. This is called an unfair lock, where thread 3 waits in line to get the lock, only to see thread 4 jump the queue to get the lock.
Fair lock execution process:
When fair lock is added, it will first judge whether there are nodes in the AQS waiting queue. If there are nodes, it will directly join the queue and wait. The specific code is as follows.
A fair lock executes the acquire() method first, but a fair lock implements tryAcquire() separately:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
:
public final void acquire(int arg) {
if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
The tryAcquire() method of fair lock in ReentrantLock is executed
#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire()
:
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if(! hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true; }}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; }}Copy the code
If the value of state is not 0 and the thread acquiring the lock is not the current thread, return false to indicate that the lock has failed and the thread is added to the waiting queue. If it is the current thread, the lock can be re-entered.
If state=0, it means that no thread is holding the lock at this time, and hasqueuedToraise () is executed to determine whether there is an element in the AQS wait queue. If there are other waiting threads, it will also join the tail of the wait queue to truly get there first and sequentially add the lock. The specific code is as follows:
#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors()
:
public final boolean hasQueuedPredecessors(a) {
Node t = tail;
Node h = head;
Node s;
returnh ! = t && ((s = h.next) ==null|| s.thread ! = Thread.currentThread()); }Copy the code
This code is very interesting. Returning false means there are no nodes in the queue or only one node created by the current thread. If true is returned, the queue has a waiting node and the current thread needs to join the queue.
Check whether head equals tail. If there is only one Node in the queue, head equals tail.
(s = h.ext) == null, which is an extreme case. In enq() enq(), it is not atomic.
In the first red box, thread 2 says head! = Tail is established. As soon as the thread completes the second red box, tail = node, but does not point head.next to node. Next == null, which returns true. In this case, a node is enqueuing.
If head.next is not empty, then determine whether the head.next node is the current thread or return false if not. Remember, return false means there is no node waiting for the LOCK in the FIFO queue, the thread can directly try to obtain the lock, if return true means there is a waiting thread, if the current thread wants to queue, this is to reflect the fair lock place.
Difference between unfair lock and fair lock: The performance of unfair lock is higher than that of fair lock. Unfair locking can reduce the overhead of the CPU to wake up threads, the overall throughput efficiency is higher, and the CPU does not have to wake up all threads, which reduces the number of threads to wake up
Although the performance of unfair lock is better than that of fair lock, it will cause thread hunger. In the worst case, there may be a thread that never gets the lock. However, the hunger issue can be ignored for now compared to performance, which is probably one of the reasons ReentrantLock creates unfair locks by default.
Condition realization principle
Introduction of Condition
Having described the core functionality provided by AQS, there are many other features as well, so let’s continue with the Condition component.
Condition was introduced in Java 1.5 and is used to replace the traditional wait() and notify() of Object for thread collaboration. It is safer and more efficient to implement inter-thread collaboration using await() and signal() in Condition. Condition is therefore generally recommended
The method of AbstractQueueSynchronizer implements the Condition, the main foreign provide awaite (Object. The wait ()) and signal (Object) notify () call.
Condition the Demo sample
Use sample code:
/** * ReentrantLock implementation source code learning *@authorA flower is not romantic *@date2020/4/28 o * /
public class ReentrantLockDemo {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
System.out.println("Thread one lock succeeded");
System.out.println("Thread one executing await is suspended");
condition.await();
System.out.println("Thread one awakened successfully");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 1 unlocked successfully");
}
}).start();
new Thread(() -> {
lock.lock();
try {
System.out.println("Thread two lock succeeded");
condition.signal();
System.out.println("Thread two wakes up thread one.");
} finally {
lock.unlock();
System.out.println("Thread two unlocked successfully"); } }).start(); }}Copy the code
The execution result is shown as follows:
Here thread 1 acquires the lock and then suspends the current thread with await() and releases the lock. Thread 2 acquires the lock and wakes up thread 1 with signal.
Condition implementation principle diagram
We still use the above demo as an example and execute the following flow:
Thread one executes the await() method:
First look at the code of the specific implementation, # Java. Util. Concurrent. The locks. AbstractQueuedSynchronizer. ConditionObject. Await () :
public final void await(a) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
AddConditionWaiter () is first called in the await() method to add the current thread to the Condition queue.
After executing, we can look at the data in the Condition queue:
The specific implementation code is as follows:
private Node addConditionWaiter(a) {
Node t = lastWaiter;
if(t ! =null&& t.waitStatus ! = Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node =new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
Copy the code
A Node Node is created with the current thread and waitStatus is CONDITION. The lock on that node is then released, calling the previously parsed release() method, which wakes up the suspended thread two, which continues to attempt to acquire the lock.
The isOnSyncQueue() method is then called to determine whether the current thread node is in the synchronization queue, because the lock was released in the previous step, which means that a thread may have acquired the lock and may have called singal(). If it has been awakened, it should not park and exit the while method. The fight for the lock continued.
Thread 1 is suspended and thread 2 has successfully acquired the lock.
The specific process is as follows:
Thread 2 executes signal() :
Let’s first consider that thread two has acquired the lock, and there is no data in the AQS wait queue.
Let’s see how thread 2 wakes up thread 1:
public final void signal(a) {
if(! isHeldExclusively())throw new IllegalMonitorStateException();
Node first = firstWaiter;
if(first ! =null)
doSignal(first);
}
Copy the code
Check whether the current thread is the thread that acquired the lock. If not, throw an exception. The doSignal() method is then called to wake up the thread.
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while(! transferForSignal(first) && (first = firstWaiter) ! =null);
}
final boolean transferForSignal(Node node) {
if(! compareAndSetWaitStatus(node, Node.CONDITION,0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0| |! compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread);return true;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
returnt; }}}}Copy the code
The Condition queue has only one Node created by thread 1, and waitStatus is Condition. CAS will change waitStatus of the current Node to 0. The enq() method is then executed to add the current thread to the wait queue and return the front node of the current thread.
The code joining the wait queue has also been analyzed above, and the data in the wait queue is shown as follows:
It then starts changing the front-node waitStatus of the current node to SIGNAL via CAS and wakes up the current thread. At this point, the waiting queue data in AQS is:
Once the thread is awakened, it continues to execute the while loop in the await() method.
public final void await(a) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(! isOnSyncQueue(node)) { LockSupport.park(this);
if((interruptMode = checkInterruptWhileWaiting(node)) ! =0)
break;
}
if(acquireQueued(node, savedState) && interruptMode ! = THROW_IE) interruptMode = REINTERRUPT;if(node.nextWaiter ! =null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode ! =0)
reportInterruptAfterWait(interruptMode);
}
Copy the code
Since thread one’s waitStatus has been changed to 0, executing isOnSyncQueue() returns false. Break out of the while loop.
The acquireQueued() method is then executed, as described here earlier, to try to recapture the lock and continue to be suspended if the lock fails to be acquired. The lock is not awakened until another thread releases it.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; }}finally {
if(failed) cancelAcquire(node); }}Copy the code
At this point, the process of thread 1 has been analyzed. After thread 2 releases the lock, the thread will retry to obtain the lock later. The process ends here.
Condition summary
Condition and wait/notify
-
Condition can accurately control multiple different conditions. Wait /notify can only be used together with synchronized keyword, and can only wake up one or all wait queues.
-
Condition needs to use Lock to control, pay attention to unlock() after Lock (), Condition has a mechanism similar to await, so there will be no deadlock caused by locking, and the underlying implementation is park/unpark mechanism. Wait /notify does wake and suspend deadlocks. Wait /notify does wake and suspend deadlocks.
conclusion
Here we use a step by step diagram method combined with three threads in turn lock/lock release to show the implementation of ReentrantLock and the implementation principle, and ReentrantLock is based on AQS implementation, so we also have a deep understanding of AQS.
In addition, it also introduces the realization principle of fair lock and non-fair lock, and the realization principle of Condition, basically using source code + drawing to explain the way, as far as possible to make everyone easier to understand.
References:
- The foundation of concurrent data structures in Java juejin.cn/post/684490…
- Java concurrency of AQS, https://www.cnblogs.com/waterystone/p/4920797.html