A preface.

Long time no see. I have been too busy to write blog recently. Just empty down, think about Java inside there is a more important system: lock, has not been shared. This article will start from the basis of the JUC package AbstractQueuedSynchronizer data structure (hereinafter referred to as AQS) as the breakthrough point to analyze the JDK the lock mechanism in the package. AQS is the cornerstone of many synchronization classes (Lock, Semaphore, ReentrantLock, etc.) in JUC package. Learning AQS well is very important to understand JUC package. This article will introduce AQS in combination with ReentrantLock, hoping to help you.

2. AQS

AQS is a simple framework that provides atomically managing synchronization state, blocking and waking up thread capabilities, and a queue model. This article will gradually go from the application layer to the principle layer, and through the basic features of ReentrantLock and the association between ReentrantLock and AQS, to understand the knowledge of AQS related exclusive lock.

The core of this article is AQS principle analysis, ReentrantLock is only a brief introduction, the follow-up will be a column introduction, pay attention to subscribe to my Java column do not get lost ~

2.1. An overview of AQS

2.1.1.AQS class framework diagram

Image: tech.meituan.com/2019/12/05/…

2.1.2. Core variables

Private TRANSIENT volatile Node head; private transient volatile Node head; Private TRANSIENT volatile Node tail; private transient volatile Node tail; /** * state variable, AQS controls access to critical resources by maintaining the value of the state variable. * Initialized to 0 means that there is no thread occupation. If exclusive lock access is used, the thread that grabs the lock will increase the value of state by 1. If the thread ID is equal to the id of the thread holding the state variable, then state is incremented by one, and the same is repeated here for subsequent lock releases. This will be demonstrated later in * ReentrantLock. */ private volatile int state;Copy the code

Description:

AQS controls access to critical resources by maintaining the value of the volatile modified state variable. Access to state provides several methods

  • getState()

  • setState()

  • CompareAndSetState () [CAS] : set and compare

    //CAS compares and sets values, again the unsafe-looking method, and calls the underlying C language method, using JMM, in a multithreaded environment. Protected final Boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } parsing can be reference for CAS: https://www.jianshu.com/p/ae25eb3cfb5d is used for optimistic locking in Java implementation in another article ConcurrentHashMap bloggers have introduced: https://juejin.cn/post/6960898411314823204Copy the code

AQS defines two resource sharing modes:

1.Exclusive execution by only one thread, such as ReentrantLock

2.Share (Share, which can be executed simultaneously by multiple threads, such as Semaphore/CountDownLatch).

Different custom synchronizers compete for shared resources in different ways. The implementation of custom synchronizer only needs to realize the acquisition and release of shared resource state. As for the maintenance of specific thread waiting queue (such as failure to acquire resources in queue/wake up queue, etc.), AQS has been implemented at the top level. The implementation of a custom synchronizer is mainly implemented in the following ways:

  • IsHeldExclusively () : Whether the thread is monopolizing resources. You only need to implement it if you use condition.
  • TryAcquire (int) : Exclusive mode. Attempts to obtain the resource return true on success and false on failure.
  • TryRelease (int) : Exclusive mode. Attempts to free resources return true on success and false on failure.
  • TryAcquireShared (int) : Share mode. Attempt to obtain resources. Negative numbers indicate failure; 0 indicates success, but no available resources are available. A positive number indicates success and free resources.
  • TryReleaseShared (int) : share mode. Attempts to free the resource, returning true if subsequent wait nodes are allowed to wake up after release, false otherwise.

In the case of ReentrantLock, state is initialized to 0, indicating that the state is not locked. When thread A locks (), tryAcquire() is called to monopolize the lock and state+1. After that, another thread will fail to tryAcquire() until the unlock() of thread A reaches state=0. Of course, thread A can repeatedly acquire the lock itself before releasing it (state will accumulate), which is the concept of reentrant. But be careful how many times you get it and how many times you release it, so that state can go back to zero.

In the CountDownLatch example, the task is divided into N child threads to execute, and state is initialized to N (note that N must be consistent with the number of threads). The N child threads are executed in parallel, countDown() once for each child thread, and state is reduced by 1. After all child threads have finished executing (i.e., state=0), unpark() the calling thread, and then the calling thread returns from the await() function to continue the residual action.

In general, custom synchronizers are either exclusive or shared methods, and they only need to implement either Tryacquire-TryRelease or tryAcquireShared. However, AQS also supports both exclusive and shared custom synchronizers, such as ReentrantReadWriteLock.

2.1.3.AQS bidirectional queue

Photo: www.cnblogs.com/waterystone…

Static final node SHARED = new node (); static final node SHARED = new node (); Static final Node EXCLUSIVE = null; // Indicates that the current node is unscheduled. When timeout or interrupted (in response to an interrupt), a change is triggered to this state, and the node will not change after entering this state. static final int CANCELLED = 1; // Indicates that the successor node is waiting for the current node to wake up. When the successor node joins the queue, the status of the predecessor node is updated to SIGNAL static final int SIGNAL = -1. Static final int Condition = -2; static final int Condition = -2; static final int Condition = -2; Static final int PROPAGATE = -3; static final int PROPAGATE = -3; // The status of the current node is volatile int waitStatus; // Volatile Node prev; // Volatile Node next; // The current node's Thread is volatile Thread Thread; // Next waiting Node Node nextWaiter; Final Boolean isShared() {return nextWaiter == SHARED; Final Node predecessor() throws NullPointerException {Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; }}Copy the code

AQS internally controls access to critical resources by maintaining a bidirectional linked list. The head node is the thread that currently has access to the shared variable, and the subsequent node is the node waiting to wake up.

2.2. Core method analysis

Having said that, in fact, AQS is a complete API library, many tool classes provided in JUC package are based on AQS resource access and release methods to achieve.

2.2.1. Acquire (int)

Acquire (int) is the top-level access, resource access (lock) access, through which threads acquire shared resources in exclusive mode. If the resource is obtained successfully, it means that the thread has access to the state variable, and the corresponding thread can execute the current business logic. If the node fails to get, the node is added to the queue. Let’s look at the code

public final void acquire(int arg) { if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code
/ / empty methods, this method allows the user to custom to control the state variables of access to the protected Boolean tryAcquire (int arg) {throw new UnsupportedOperationException (); }Copy the code
Private Node addWaiter(Node mode) {// Constructs a Node with a given mode. Node = new Node(thread.currentThread (), mode); // Try the fast way to go straight to the end of the queue. Node pred = tail; if (pred ! = null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; }} // If you fail in the previous step, you can join the team through ENQ. enq(node); return node; }Copy the code
Private Node enq(final Node Node) {//CAS" spins "until it joins the queue for (;;) { Node t = tail; // If the queue is empty, create an empty flag node as head and point tail to it as well. if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else {node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code
Final Boolean acquireQueued(final Node Node, int arg) {final Boolean failed = true; Boolean interrupted = false; // spin for (;;) Final Node p = node.predecessor(); If the node is head, then it is eligible to attempt to fetch a resource (possibly because the boss woke up from the resource, or because it was interrupted). If (p == head && tryAcquire(arg)) { So head refers to the benchmarking node, which is the node from which the resource was obtained or null. setHead(node); // node.prev is null in setHead, and head.next is null in setHead so that GC can reclaim the previous head node. It means that the node out of the team before taking the resource! p.next = null; // Failed = false; / Whether the wait has been interrupted by return interrupted; } // If you can rest, wait through park() until unpark(). If the uninterruptible condition is interrupted, it wakes up from park(), finds that the resource is not available, and continues into park() to wait. If (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) / / if the interruption in the process of waiting, even if only once, If interrupted = true; }} finally {// If a resource is not acquired successfully during a wait (such as timeout, or interrupted in the case of interruptible), then cancel the node waiting in the queue. if (failed) cancelAcquire(node); }}Copy the code
Private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {/ / get the state of the precursor int ws = Mr Pred waitStatus; If (ws == node. SIGNAL) // If (ws == node. SIGNAL) If (ws > 0) {// If (ws > 0) {// If (ws > 0) {// If (ws > 0) {// If (ws > 0) {// If (ws > 0) {// If (ws > 0) { Do {node.prev = pred = pred.prev; do {node.prev = pred = pred. } while (pred.waitStatus > 0); pred.next = node; } else {// If the precursor works well, set the precursor status to SIGNAL and tell it to notify itself when it finishes retrieving the number. It could fail. He might have just been released! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }Copy the code
Private final Boolean parkAndCheckInterrupt() {// Call park() to put the thread into waiting state locksupport.park (this); // If you are awakened, check to see if you are interrupted. Thread.interrupted() clears the status of the current Thread. Return thread.interrupted (); }Copy the code

Look at the source code summary

  1. Call tryAcquire() on the custom synchronizer to attempt to acquire the resource directly and return it if successful;
  2. If not, addWaiter() adds the thread to the end of the wait queue and marks it as exclusive;
  3. AcquireQueued () causes the thread to rest in the waiting queue and attempt to acquire the resource when it has an opportunity (unpark() in its turn). Return after obtaining the resource. Returns true if it was interrupted during the entire wait, false otherwise.
  4. If a thread has been interrupted while waiting, it does not respond. SelfInterrupt () is performed only after the resource has been retrieved, reclaiming the interrupt.

Photo: www.cnblogs.com/waterystone…

2.2.2. Release (int)

Corresponding to acquire (int), Release (int) is the top-level entry, resource release(unlock) entry, through which threads release shared resources in exclusive mode. If it is completely freed (state=0), it wakes up other threads in the wait queue to acquire resources. Take a look at the source code

Public final Boolean release(int arg) {if (tryRelease(arg)) { if (h ! = null && h.waitStatus ! Succeeded = 0) // Wake up the next thread in the wait queue unparkprecursor (h); return true; } return false; }Copy the code
/ / in the same way as empty methods, by the corresponding implementation class to implement the protected Boolean tryRelease (int arg) {throw new UnsupportedOperationException (); }Copy the code
Private void unparksucceeded (Node Node) {// Node is the Node of the current thread. int ws = node.waitStatus; // set the state of the node where the current thread resides to zero, allowing failure. if (ws < 0) compareAndSetWaitStatus(node, ws, 0); S Node s = node.next; / if null or cancelled the if (s = = null | | s. aitStatus > 0) {s = null; // Look from back to front. for (Node t = tail; t ! = null && t ! = node; T = t. rev) // All nodes <=0 are still valid. if (t.waitStatus <= 0) s = t; } // Wake up if (s! = null) LockSupport.unpark(s.thread); }Copy the code

The logic of release is relatively simple: empty the header and wait for GC, look for the uncancelled node later, wake up the node with waitStatus<=0 as the header and obtain the access to state.

2.2.3. AcquireShared (int)

This method is the top-level entry for threads in shared mode to obtain shared resources. It will obtain a specified amount of resources, the success of the return, the failure to get into the waiting queue, until the resource, the whole process ignores interruption. Take a look at the source code

Public final void acquireShared(int arg) {public final void acquireShared(int arg) { 0 indicates that the command is successfully obtained but has no remaining resources. A positive number indicates success, and // there are still resources available for other threads to acquire. TryAcquireShared () if (tryAcquireShared(ARG) < 0) if (tryAcquireShared(ARg) < 0) if (tryAcquireShared(ARg) < 0) DoAcquireShared (ARG) is returned until the resource is obtained; }Copy the code
/ / the same business side their implementation protected int tryAcquireShared (int arg) {throw new UnsupportedOperationException (); }Copy the code
Private void doAcquireShared(int arg) {// Join final Node Node = addWaiter(node.shared); Boolean failed = true; Boolean interrupted = false; for (;;) {// Final Node p = node.predecessor(); If (p == head) {int r = tryAcquireShared(arg); if (p == head) {int r = tryAcquireShared(arg); Propagate(node, r); // Propagate(node, r); p.next = null; // help GC/If the wait is interrupted, the interrupt will be filled. if (interrupted) selfInterrupt(); failed = false; return; } // Enter a waiting state. Waiting to be unpark () or interrupt () if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // setHead(node); // If there is any left, Continue to wake up the next neighbor thread if (the propagate > 0 | | h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.w. aitStatus < 0) {Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}Copy the code

The overall logic is consistent with the team exclusive lock

In contrast to exclusive mode, it is also important to note that only when the thread is head.next (the “second” thread) will attempt to acquire resources and wake up subsequent teammates if there is any left. So the problem is, if the eldest child runs out and releases five resources, the second child needs six, the third child needs one, and the fourth child needs two. The eldest brother wakes up the second brother first. The second brother sees that resources are not enough. Does he give resources to the third brother or not? The answer is no! The second child will continue to park(), waiting for other threads to release resources, and will not wake up the third and fourth children. Exclusive mode, where only one thread is executing at a time, is fine; However, in shared mode, multiple threads can be executed at the same time. Now, because the second thread has a large demand for resources, the third and fourth threads with a small amount of resources are also stuck. Of course, this is not a problem, but AQS guarantees to wake up strictly in the order in which they join (ensuring fairness, but reducing concurrency).

2.2.4. ReleaseShared ()

Reverse release the shared lock

Public Final Boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// Wake up the subsequent node doReleaseShared(); return true; } return false; }Copy the code
private void doReleaseShared() { for (;;) { Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // Wake up the unparkprecursor (h); } else if (ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // if (h == head) break; }}Copy the code

The process of this method is also relatively simple, in a word: after releasing the resources, wake up the successors. Similar to release() in exclusive mode, but with a slight note: Exclusive tryRelease() does not return true to wake up other threads until it has completely freed resources (state=0), mainly due to reentrant considerations. ReleaseShared () in shared mode has no such requirement. In shared mode, a certain number of threads are controlled to execute concurrently, so that the thread with the resource can wake up the waiting node when it releases some resources. For example, if the total amount of resources is 13, A (5) and B (7) respectively obtain resources and run concurrently. When C (4) only has one resource, it needs to wait. TryReleaseShared (2) returns true to wake up C. C sees only 3 resources, but it is still not enough to wait. TryReleaseShared (2) returns true to wake C up, and C is ready to run with A and B. TryReleaseShared () of a ReentrantReadWriteLock read lock returns true only when the resource is completely freed (state=0), so custom synchronizer can determine the return value of tryReleaseShared() as needed.

Introduction and use of ReentrantLock

3.1. Introduction

ReentrantLock means ReentrantLock, which means that a thread can repeatedly lock a critical resource. Often used in contrast to the Synchronized keyword. This section will be based on the previous explanation of the principle of AQS, to help you understand the role of AQS in ReentrantLock.

Difference between Synchronized and ReentrantLock: juejin.cn/post/684490…

3.1.1. Fair and unfair locks

ReentrantLock supports two locking methods, fair and unfair.

To introduce ReentrantLock fair and unfair lock in the scene of eating KFC

** I want to eat KFC, see people in front of the line, honestly to the end of the existing line, wait for the front of the people to finish buying me.

Unfair lock: : I want to eat KFC, see someone in front of the queue, I do not want to queue, directly jump the queue to buy, if this time you place an order successfully, then you can save the queuing time to eat KFC, but if the order fails, honestly continue to queue.

advantages disadvantages
Fair lock All threads get resources and don’t starve to death in the queue Throughput drops so much that all but the first thread in the queue is blocked, and it’s expensive for the CPU to wake up the blocked thread
Not fair lock It can reduce the overhead of the CPU to wake up threads, the overall throughput efficiency will be higher, the CPU does not have to wake up all threads, will reduce the number of threads to wake up A thread in the middle of the queue may not acquire the lock for a long time, leading to starvation

3.2. Already use

3.2.1.ReentrantLock constructor

ReentrantLock uses an unfair lock by default

Public ReentrantLock() {sync = new NonfairSync(); } public ReentrantLock(Boolean fair) {sync = fair? new FairSync() : new NonfairSync(); }Copy the code
/** * NonfairSync extends Sync {private static Final Long serialVersionUID = 7316153563782823691L; final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }} /** * FairSync extends Sync {private static Final Long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); Protected final Boolean tryAcquire(int acquires) {final Thread current = thread.currentThread (); int c = getState(); if (c == 0) { if (! hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}Copy the code

3.2.2.ReentrantLock is used for locking and unlocking

By now, it’s not clear how ReentrantLock relates to AQS

Let’s start with a brief use of ReentrantLock

Don’t lock

public class Demo { int count = 1; public void add(){ this.count ++; } public static void main(String[] args) throws Exception{ Demo demo = new Demo(); ExecutorService executorService = Executors.newFixedThreadPool(30); List<Runnable> runnables = new ArrayList<>(); List<Future<? >> futures = new ArrayList<Future<? > > (); for (int i = 0; i < 1000; i++) { runnables.add(()->{ demo.add(); }); } runnables.forEach(e->{ futures.add(executorService.submit(e)); }); For (Future<? > f : futures) { try { f.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println(demo.count); }} Console output: 987 is basically not equal to 1001Copy the code

lock

public class Demo { ReentrantLock reentrantLock = new ReentrantLock(); int count = 1; public void add(){ reentrantLock.lock(); try{ this.count ++; }catch (Exception e){ e.printStackTrace(); }finally { reentrantLock.unlock(); } } public static void main(String[] args) throws Exception{ Demo demo = new Demo(); ExecutorService executorService = Executors.newFixedThreadPool(30); List<Runnable> runnables = new ArrayList<>(); List<Future<? >> futures = new ArrayList<Future<? > > (); for (int i = 0; i < 1000; i++) { runnables.add(()->{ demo.add(); }); } runnables.forEach(e->{ futures.add(executorService.submit(e)); }); For (Future<? > f : futures) { try { f.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println(demo.count); }}Copy the code

3.2.3.ReentrantLock lock and unlock resolution

ReentrantLock relies on lock() and unlock() methods. Let’s see what these two methods are doing, okay

1. The Lock () method

public void lock() {
    sync.lock();
}
Copy the code
The abstract static class Sync extends AbstractQueuedSynchronizer {/ / omit unnecessary code abstract void the lock (); // omit unnecessary code}Copy the code

Look at the lock method implementation

There are fair and unfair lock implementation.

For convenience, take the logic of fair lock implementation as an example

// Final void lock() {acquire(1); }Copy the code
Public final void acquire(int arg) {if (! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

There is also a method tryAcquire in the FairSync class

Protected final Boolean tryAcquire(int acquires) {final Thread current = thread.currentThread (); Int c = getState(); If (c == 0) {// If (c == 0) {// If (c == 0) { hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code

It is found to be an overlay approach, consistent with the business-side implementation mechanism described above.

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code

Now that we have associated the ReentrantLock method with the AQS method, let’s go through the steps of fair locking

2. Unlock () method

//ReentrantLock method public void unlock() {sync.release(1); }Copy the code
Public final Boolean release(int arg) {if (tryRelease(arg)) {Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

3.2.4. Implementation differences between fair lock and unfair lock

We know that ReentrantLock is an exclusive lock, so what’s the difference between a fair lock and an unfair lock?

Don’t look at the source, we combined with the above code analysis of AQS and fair lock implementation we think can be reflected in where?

The lock () method finds that if the shared variable is occupied by a thread other than the current thread, and there are already threads queued, it should queue up. Going back to the KFC diagram, the unfair lock will try to jump the queue when it knows that state is occupied and there is a thread in the queue. Ok, with that in mind let’s look at the implementation of an unfair lock

Final void lock() {// The first time you jump the queue, whether the current state is occupied or not, If (compareAndSetState(0, 1)) setExclusiveOwnerThread(thread.currentThread ()); else acquire(1); }Copy the code
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
Copy the code
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); If (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code
  1. When an unfair lock is called, CAS will be called to grab the lock. If the lock is not occupied at this time, CAS will return the lock.

  2. After a CAS failure, an unfair lock will enter the tryAcquire method just like a fair lock. In the tryAcquire method, if the lock is released (state == 0), the unjust lock will be directly seized by CAS. However, fair lock will determine whether there is a thread waiting in the queue, if there is, it will not grab the lock, obediently queue to the back.

Iv. References and acknowledgements

Reference: graphic thinking tech.meituan.com/2019/12/05/…

This paper mainly refer to: www.cnblogs.com/waterystone…

Five. Contact me

If you think the article is well written, you can like it and comment + follow it

Nailing: louyanfeng25

WeChat: baiyan_lou