preface

Thread concurrency series:

Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Java Thread State Unsafe/CAS/LockSupport Application and principle Java concurrency “lock” nature (step by step to implement the lock) Java Synchronized implementation of mutual exclusion application and source exploration Java object header analysis and use (Synchronized related) Java The evolution process of Synchronized partial lock/lightweight lock/heavyweight lock Java Synchronized principle of heavyweight lock in depth (mutual exclusion) Java Synchronized principle of heavyweight lock in depth (synchronization) Java concurrency AQS In-depth analysis (on) the Java concurrency of AQS deep parsing (under) Java Thread. Sleep/Thread. Join/Thread. The yield/Object. Wait/Condition. Await explanation of Java concurrency Already the thorough analysis of concurrent Java (with Synchronized difference) Java Semaphore/CountDownLatch/CyclicBarrier ReentrantReadWriteLock in-depth analysis Deep parsing (principle), Java Semaphore CountDownLatch/CyclicBarrier in-depth analytical (application), the most detailed graphic analytic Java various locks (ultimate) thread pool will understand series

The previous several analyses of the principle and use of synchronized, synchronized is implemented by JVM, the core code is C++, for readers who are not familiar with C++ language may be a little difficult. The JUC package provides a new synchronization framework: AQS, which is implemented in pure JAVA code. If you’re familiar with the synchronized core, AQS are in order, but if not, this article will take an in-depth look at AQS from start to finish. Through this article, you will learn:

1, how to achieve their own synchronization framework 2, AQS function analysis 3, AQS exclusive lock realization 4, AQS shared lock realization 5, scene simulation and troubleshooting

1. How to implement your own synchronization framework

Prepare critical data structures

The first is to share variables as “locks”. Because this shared variable is shared by multiple threads, the volatile keyword is required to ensure visibility between threads.

Second, when the thread succeeds in the lock competition, it enters the critical area to execute the code. When the thread fails, it needs to join the queue for waiting. Therefore, a synchronization queue is needed to store the thread suspended because of the lock failure.

The third thread needs to synchronize. Thread A waits for thread B to produce data, and thread B notifies thread A that it produced data, so A wait (condition) queue is required.

Core operations

Once the data structure is ready, you need to manipulate the above data structure to implement the locking function. Thread contention lock: the “lock core” is operated through CAS. If the operation succeeds, the critical section code is executed. If the operation fails, it is added to the synchronization queue. Thread awakened: The thread that acquired the lock releases the lock after executing the critical section code and wakes up the thread waiting in the synchronization queue, which continues to compete for the lock. A thread waits on a condition: a thread joins a wait queue because a condition is not met, releases the lock, and suspends the wait. If the condition is met, the thread is awakened and continues to compete for the lock.

The above steps are the core of the lock function. Whether synchronized or AQS, the basic functions are the above steps, but they also have richer functions, such as interruptible (AQS), reentranced, exclusive and shared (AQS), etc. The nature of Java concurrent “locking” (Step by step)

2. AQS function analysis

AQS is short for AbstractQueuedSynchronizer, as the name implies: synchronizer. It is the synchronization framework under JUC and the core class to implement locking. AQS is an abstract class that provides basic methods and requires subclasses to implement specific lock acquisition and lock release operations.

Subclass wrapper extending from AQS under JUC:

Next, enter the AQS source code to see how it is to achieve the above functions. Note: Semaphore and CountDownLatch are not strictly locks, which will be discussed later when we analyze each lock

3, AQS exclusive lock implementation

A. Find the key data structures first

lock

#AbstractQueuedSynchronizer.java
    private volatile int state;
Copy the code

State is known as the shared resource or synchronization state, and as the lock core, it is volatile.

Synchronous queue

# AbstractQueuedSynchronizer. Java / / to synchronize the head of the queue private transient volatile Node head; Private TRANSIENT volatile Node tail; private transient volatile Node tail;Copy the code

Let’s look at the elements in Node:

#AbstractQueuedSynchronizer.java static final class Node { ... // Volatile Node prev; // Volatile Node next; // The Thread occupying the exclusive lock is volatile Thread Thread; // point to the next waiting condition Node nextWaiter; . }Copy the code

B. Methods of manipulating critical data structures

1. Start with the operation to get the synchronization state

acquire(xx)

# AbstractQueuedSynchronizer. Java public final void acquire (int arg) {/ / arg different locks with different meanings / / tryAcquire specific state of acquiring the synchronization operation by a subclass implementation //addWaiter encapsulates the current thread in Node and joins the synchronization queue //acquireQueued If (! TryAcquire (arG) && acquireQueued(addWaiter(node.exclusive), arg)) selfInterrupt(); / / -- -- -- -- -- -- -- -- -- -- -- -- -- (1)}Copy the code

The writing is very simple, and the important work is reflected in each method. Question 1: Why is selfInterrupt() required, as indicated in (1)? \color{Red}{QUESTION 1: If marked with (1), why selfInterrupt() is required? } Question 1: Why is selfInterrupt() required, as indicated in (1)?

tryAcquire(xx)

Where the lock is actually obtained, that is, where the lock “state” is operated, different subclasses have different implementations, which will be discussed later. TryAcquire (xx) Returns true if the synchronization status is successfully obtained, false if the synchronization status is failed to be obtained.

addWaiter(xx)

# AbstractQueuedSynchronizer. Java private Node addWaiter (Node mode) {/ / construct a new Node Node Node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred ! = null) {// The tail node exists // the precursor pointer of the new node points to the tail node node.prev = pred; / / -- -- -- -- -- -- -- -- -- -- -- -- -- (2) / / if CAS is modified to end node point to the new node (compareAndSetTail (Mr Pred, Pred. Next = node; pred. Next = node; return node; }} // If the previous step fails to join the queue, the enq(node) will be reached; // Return node; } private Node enq(final Node Node) {// Infinite loop be sure to insert queue successfully for (;;) { Node t = tail; If (t == null) {// if (compareAndSetHead(new Node())) // tail = head; } else {// Add a new node to the end of the queue as in addWaiter; node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; }}}}Copy the code

AddWaiter (xx) adds a node to the end of the synchronization queue. Note that:

The head node is not associated with any thread and serves only as an index.

Finally, the synchronization queue is shown as follows:


The problem 2 : As marked ( 2 ) , why set the precursor pointer first? \color{Red}{question 2: if (2) marked, why set the precursor pointer first? }

acquireQueued(xx)

As a rule of synchronized, you should suspend the thread after joining the queue, and see how the AQS implementation is different:

#AbstractQueuedSynchronizer.java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; Boolean interrupted = false; for (;;) Final Node p = node.predecessor(); If (p == head && tryAcquire(ARg)) {if (p == head && tryAcquire(ARg)) {if (p == head && tryAcquire(ARg)) { SetHead (node); setHead(node); // Next =null to help GC p.ext =null; failed = false; return interrupted; } // Check whether you need to suspend after the synchronization status fails to be obtained. May need to hang the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) / / tag interrupt interrupted = true; }} finally {if (failed) cancelAcquire(node) cancelAcquire(node); }}Copy the code

The logic here may be a little convoluted, so let’s analyze it. First, there is an infinite loop that exits if the current thread successfully obtains the synchronization state. Second, if the current newly enqueued node is preceded by a head node, it will attempt to obtain synchronization status. If failed to get the synchronization status or its precursor node is not head node, is to enter shouldParkAfterFailedAcquire (xx) method.

#AbstractQueuedSynchronizer.java private static boolean shouldParkAfterFailedAcquire(Node pred, Int ws = pred.waitStatus; If (ws == node. SIGNAL) // Return true for SIGNAL; Node.prev = pred = pred.prev; if (ws > 0) {if (ws > 0) {// The node was canceled do {// Go back until you find a node that was not canceled node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // find pred.next = node; SIGNAL compareAndSetWaitStatus(pred, ws, node.signal); } return false; }Copy the code

This method returns true, indicating that the status of the precursor node of the current thread node is SIGNAL. The parkAndCheckInterrupt() method is then executed.

parkAndCheckInterrupt()

As the name suggests, suspend the thread and check for interrupts.

# AbstractQueuedSynchronizer. Java private final Boolean parkAndCheckInterrupt () {/ / hung thread LockSupport. Park (this); Return thread.interrupted (); }Copy the code

cancelAcquire(xx)

This method is called in two scenarios:

1. If an exception occurs in obtaining the synchronization status, cancel the operation of the thread competing for the synchronization status. 2. When the timeout period for obtaining the synchronization state expires, if the synchronization state cannot be obtained successfully, this method is called.

#AbstractQueuedSynchronizer.java private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pred = node.prev; // Skip while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; // Marked as CANCELLED node.waitStatus = node.cancelled; If (node == tail && compareAndSetTail(node, pred)) {// Set the next pointer of the first node to null, CompareAndSetNext (pred, predNext, null); } else {//---------------(3) int ws; if (pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! = null) {// If the first Node is not a head Node, the current Node is not a tail Node Node next = node.next; if (next ! = null && next-waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else {// The precursor nodes are the head nodes unparkprecursor (node); } node.next = node; // help GC } }Copy the code

Question 3: When is node removed from the queue, as indicated in (3)? \color{Red}{question 3: When is node removed from the queue as indicated in (3)? } Question 3: When is a node removed from the queue, as indicated in (3)? 2. Release the synchronization state again

release(xx)

# AbstractQueuedSynchronizer. Java public final Boolean release (int arg) {/ / tryRelease release synchronous state if (tryRelease (arg)) { Node h = head; if (h ! = null && h.waitStatus ! If waitStatus is not 0, wake up the unparksucceeded (h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; If (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; If (s = = null | | s. aitStatus > 0) {/ / if there is no subsequent nodes or cancel the state s = null; / / the first Node from the tail to start looking for the nearest did not cancel the Node -- -- -- -- -- -- -- -- -- -- - (4) for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } // Wake up the thread if (s! = null) LockSupport.unpark(s.thread); }Copy the code


The problem 4 : As marked ( 4 ) , why index from the tail? \color{Red}{q4: why do I need to start the index from the tail as indicated in (4)? }

Node status

# AbstractQueuedSynchronizer. Java / / node is CANCELLED, no longer participate in the competition to lock the static final ints CANCELLED = 1; Static final int SIGNAL = -1; Static final int CONDITION = -1; static final int CONDITION = -1; Static final int PROPAGATE = -3; // Default is 0Copy the code

So far, the process of acquiring and releasing the exclusive lock has been analyzed, as shown below:

4. AQS shared lock implementation

An exclusive lock allows only one thread to acquire the lock at a time, whereas a shared lock does not. See AQS for an implementation of shared locks. Let’s start by looking at the operation to get the shared synchronization status

acquireShared(xx)

# AbstractQueuedSynchronizer. Java public final void acquireShared (int arg) {/ / access to Shared synchronization state, If (tryAcquireShared(arg) <0) doAcquireShared(arg) }Copy the code

Unlike the acquisition of an exclusive lock, operations such as joining a synchronous queue and suspending a thread are put into a single method.

doAcquireShared(xx)

# AbstractQueuedSynchronizer. Java private void doAcquireShared (int arg) {/ / join synchronous queue, Final Node Node = addWaiter(node.shared); // Failed to obtain synchronization status Boolean failed = true; try { boolean interrupted = false; for (;;) {// Final Node p = node.predecessor(); Int r = tryAcquireShared(arg); if (p == head) {// If (p == head) {int r = tryAcquireShared(arg); Propagate(node, r); propagate (node, r); p.next = null; // help GC if (interrupted) // interrupt (); failed = false; return; }} / / in accordance with an exclusive lock the if (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt ()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}Copy the code

The tryAcquireShared(arG) return value represents the currently available resources.

setHeadAndPropagate(xx)

#AbstractQueuedSynchronizer.java private void setHeadAndPropagate(Node node, Propagate) {//propagate == 0 propagate == 0 // Record old head for check below // setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; / / if subsequent node is Shared, awaken the if (s = = null | | s.i sShared ()) doReleaseShared (); }Copy the code

In addition to pointing the header to the current node, you also need to wake up the next shared node.

Exclusive locks do not.

It’s also easier to understand from a practical point of view:

A bathhouse is divided between men and women. Currently, only women are allowed to enter the bathhouse first, while a group of men wait in line and are allowed to enter the bathhouse when the women are finished. The first man went in and found it washable, so he said to the second man that it was washable, you come in, really washable, this is the spirit of sharing. This bathhouse is shared.

Let’s look at the action of releasing the shared synchronization state

releaseShared(xx)

# AbstractQueuedSynchronizer. Java public final Boolean releaseShared (int arg) {if (tryReleaseShared (arg)) {/ / release after the success of the synchronization state, Notify subsequent nodes of doReleaseShared(); return true; } return false; }Copy the code

As you can see, the thread attempts to wake up subsequent nodes after acquiring and releasing the shared lock, calling the doReleaseShared() method.

doReleaseShared()

#AbstractQueuedSynchronizer.java private void doReleaseShared() { for (;;) { Node h = head; if (h ! = null && h ! Int ws = h.waitStatus; If (ws == node.signal) {// Wake up subsequent nodes if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } / / head node status set to PROPAGATE -- -- -- -- -- -- -- -- > (5) else if (ws = = 0 &&! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS} // this method may be called by multiple threads, and the thread will modify the header node after the lock is acquired. If (h == head) // loop if head changed; }}Copy the code

Q5: Why is PROPAGATE state required as indicated in (5)? \color{Red}{question 5: Why is the PROPAGATE state required as indicated in (5)? } Q5: Why is the PROPAGATE state required as indicated in (5)? So far, the process of acquiring and releasing shared locks has been analyzed, as shown below:

5. Scene simulation and difficulty analysis

Shared lock, exclusive lock implementation of important methods, data structure have been over, then through the simulation scenario to analyze the above five problems.

1. Ask: Why selfInterrupt()

#AbstractQueuedSynchronizer.java
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

Locksupport. park(this) suspends the Thread and calls Thread.interrupted() after suspension to check the interrupted status. In addition to querying the interrupted status, Thread.interrupted() resets the interrupted status, meaning that after calling this method the interrupted status was true and becomes false. In the whole acquire(xx) method, there is no interrupt processing anywhere, so you cannot simply set the interrupt state to false. You also need to restore the original state so that external callers can sense if an interrupt has occurred, so selfInterrupt() needs to reset the interrupt state to true.

If you want to restore the interrupted state, why set this value to false in the first place instead of calling Thread.isinterrupted ()? Imagine A scenario where thread A is interrupted, wakes up from the suspended state, tries to get the synchronization state, finds that it still can’t get it, and starts preparing to suspend again. The suspension thread here uses the locksupport.park (xx) method, which uses the park.park (xx) function underneath:Notice the code in the red box, if the current thread interrupt status is found to be true, it will return the thread no longer suspended. If thread.isinterrupted () is called and the interrupted status is not changed to false, the Thread cannot be suspended when the locksupport.park (xx) method is called. However, if acquire(xx) method did not acquire the lock, it kept circulating, leading to the thread constantly polling the synchronization state, resulting in unnecessary CPU resource waste.

Parker details:Java Unsafe/CAS/LockSupport applications and principles

For thread interrupt details, go to:Java gracefully Interrupts threads

2, ASK: why set the current order of the precursor pointer:

node.prev = pred——>pred.next = node;

The prev of the new node points to the tail node, and the next of the tail node points to the new node.

The synchronization queue now has two nodes, one header and one Node1. To assign pred.next first, assume the following flow:

Thread A contended for the lock. If the lock failed, Node1’s next would point to NewNodeA. 2. Another thread, B, also contended for the lock and failed, pointing next from Node1 to NewNodeB. 3. Point the tail pointer to the new node (either NewNodeA or NewNodeB), if NewNodeA, and then redirect the prev of NewNodeA to Node1. Here’s the problem: while NewNodeA’s prev points to Node1, Node1’s next points to NewNodeB.

Assigning node.prev first does not. The root cause of this problem is that the multithreaded operation queue element (assigning node1.next) is not concurrence protected, and giving Node.prev first is not an operation queue. Delay the operation queue step until the CAS is successful, so that the queue can be modified correctly. Of course, before pred.next = node is executed, other threads may traverse the query queue, at which point pred.next may be empty, as in node1.next above. This is why some articles on the web say the next pointer is unreliable.

CancelAcquire (xx) cancelAcquire(XX)

1. If node is the last node in the queue, remove node from the queue. 2. If Node is the head node, the detection of unparkprecursor (XX) is called. 3, if the node is a intermediate nodes, in shouldParkAfterFailedAcquire (xx)/unparkSuccessor (xx) is completely removed.

4. Q: Why do we need to start the index from the tail? In point 2, the next pointer that analyzes the node may be empty. So, to be on the safe side, start indexing from the end of the queue.

Q: Why is the PROPAGATE state PROPAGATE only available when the node is shared? Suppose that there are now four threads, A, B, C and D. A/B tries to acquire the lock first, fails, then it suspends itself, and C/D releases the lock. Follow the Semaphore lock acquisition/release process.

State =1, set head. WaitStatus =0, wake up A, call tryAcquireShared(xx), return r=0, state=0 D releases the lock before A calls setHeadAndPropagate(xx). At this time, D calls doReleaseShared() and finds that head.waitStatus==0, so no other nodes are woken up. At this time, A calls setHeadAndPropagate(xx), because r==0 and head.waitStatus==0, doReleaseShared() is not called, so no other nodes are woken up. The result is that node B is not woken up.

If the PROPAGATE state, in the above steps 2 D call doReleaseShared (after), found that the head. WaitStatus = = 0, then set the head. WaitStatus = PROPAGATE, in step 3, Find head. WaitStatus ==PROPAGATE, then wake up B. Although in step 2 no thread is woken up, the PROPAGATE state is set, and in the following step it is found that the PROPAGATE has been set, so the PROPAGATE is woken up, which is also the meaning of the PROPAGATE name: PROPAGATE.

Due to the length, the next part will analyze the AQS interruption, conditional wait and other relevant knowledge.

This article is based on JDK1.8.

If you like, please like, pay attention to your encouragement is my motivation to move forward

Continue to update, with me step by step system, in-depth study of Android/Java