The 24th “365 Original Plan”.


Today! Mr. Lighthouse told us:

Clear AbstractQueuedSynchronizer line source analysis

Based on the analysis of Java and contract awarding Java. Util. Concurrent source, without need to know about AbstractQueuedSynchronizer (hereinafter abbreviated AQS) this abstract class, because it is the foundation of Java and contracting out the tools, Is the basis for implementing classes such as ReentrantLock, CountDownLatch, Semaphore, FutureTask, etc.

Google AbstractQueuedSynchronizer, we can find a lot of AQS introduction, but many are not presented clearly, because most of the articles did not clarify some of the key details.

This article will from already fair lock source code, the analysis of AbstractQueuedSynchronizer how this class works, hoping to provide some simple help for you.

State the following:

  1. This article is long, but simple, and is intended for beginners in concurrent programming, or developers who want to read Java and distribute the source code. It may take a few hours for beginners to fully understand, but it’s worth the time.

  2. Source code environment JDK1.7 (1.8 no changes), see do not understand or confused parts, it is best to open the source code to see. Doug Lea’s code is really good.

  3. This paper does not analyze the sharing mode, so as to reduce the burden on readers. The sharing mode will be analyzed in the third article. And the condition part is not analyzed, so it is easy to read.

  4. This article uses ReentrantLock, the most commonly used ReentrantLock concept, which is intrinsically incorrect. Readers should be aware that AQS is not only used to realize ReentrantLock, but only hope that readers can associate the use of AQS with locks to reduce reading pressure.

  5. ReentrantLock’s fair and unfair locks are only slightly different and will be covered in a second article.

  6. In the comments section, some readers gave feedback that this article is not friendly to use the code directly and should be equipped with more point flow charts. This article does have this problem. But as someone who has been there before, I want to tell you that for AQS, form really doesn’t matter. What matters is getting the details right.


AQS structure

Let’s take a look at what attributes AQS have, figure out these basic know what routine AQS is, after all, you can guess!

Private TRANSIENT volatile Node head; private transient volatile Node head; Private transient volatile Node tail; private transient volatile Node tail; // This value can be greater than 1 because locks can be reentrant. Each time the lock is reentrant, the value is added with 1private volatile int state. // represents the thread currently holding the exclusive lock. The most important use example is because the lock can be reentrantlock. lock() can be nested multiple times, so this is used each time to determine whether the current thread already owns the lockif(currentThread == getExclusiveOwnerThread()) {state++}private transient Thread exclusiveOwnerThread; / / since the AbstractOwnableSynchronizer inheritanceCopy the code

Well, it looks like it should be easy, because there are only four attributes.

AbstractQueuedSynchronizer waiting queue beckoned as shown below, pay attention to, after the analysis process in the queue, which is blocking queue does not contain the head, does not contain a head, did not include the head.

Each thread in the queue is wrapped as a Node instance. The data structure is a linked list.

Static final class Node {// Static final Node SHARED = new Node(); Static final Node EXCLUSIVE = null; // ======== the following several int constants are given towait=========== /** is used for StatuswaitStatus value to indicate thread has cancelled */ / Static final int cancelled = 1; / * *waitStatus value to indicate successorStatic final int SIGNAL = -1; 'thread needs unparking */ / static final int SIGNAL = -1; /** indicate thread is waiting on condition */ The next article will cover static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should */ / The same unconditionally Static final int PROPAGATE = PROPAGATE; / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = / / values for the above 1, 1, 2, 3, or 0 (talk about later) / / so understand, If the value is greater than 0, the thread has cancelled the wait. // ps: ReentrantLock is not available. volatile int waitStatus; // References to the precursor Node volatile Node prev; // References to subsequent nodes volatile Node next; // Volatile Thread Thread; }Copy the code

The data structure of Node is simple: Thread + waitStatus + Pre + Next.

These are the basics, and we’re going to use them many times, so keep them in mind, just keep this structure in mind. Next, let’s talk about fair locking for ReentrantLock. Again, the blocking queue I’m talking about does not contain the HEAD node.



First, let’s look at how ReentrantLock is used.
Public class OrderService {// Use static, so that each thread has the same lock, of course, spring MVC service default singleton, Private static ReentrantLock ReentrantLock = new ReentrantLock(true);    public void createOrderReentrantlock. lock(); reentrantLock.lock(); // Usually, the lock is followed by the try statement try {// only one thread can come in at a time (the thread that acquired the lock), // other threads block on the lock() method, waiting for the lock to come in, and then execute the code... // Execute code... // Execute code... } finally {// unlock reentrantlock. unlock(); }}}Copy the code

ReentrantLock internally uses the internal Sync class to manage locks, so the actual lock acquisition and lock release are controlled by Sync’s implementation class.

abstract static class Sync extends AbstractQueuedSynchronizer {}Copy the code

Sync has two implementations, NonfairSync and FairSync. Let’s look at the FairSync section.

public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }Copy the code

Thread rob lock

A lot of people must start to dislike the above nonsense too much, the following follow the code, I will not nonsense.

static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // Lock contention final voidlock() { acquire(1); } // if tryAcquire(arg) returns, tryAcquire(arg) returnstrueAnd it's over. Public final void acquire(int arg) {public final void acquire(int arg) {tryAcquire(1) == 1; // The semantics of a fair lock are: there is no one holding the lock in the first place, so there is no need to queue.if(! TryAcquire (ARG) && // tryAcquire(ARG) did not succeed. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } /** * Fair version of tryAcquire. Don// / Run the following command to check the status of the device: // Run the following command to check the status of the device: // Run the following command to check the status of the device: No thread is waiting for the lock; 2. Reentrant lock. The thread already holds the lock. Protected Final Boolean tryAcquire(int acquires) {final Thread current = thread.currentThread (); int c = getState(); If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) { Hasqueuedtoraise () &&if no thread is waiting, try the withcas and succeed and obtain the lock. If it fails, it can only mean one thing: a thread stole the toraise at almost the same moment =_= // since there was no one else, SetExclusiveOwnerThread (current) {compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; }} // This branch will enter the else if branch, which indicates that it is reentrant. Else if (current == getExclusiveOwnerThread()) {int nexTC = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // if (!) {// if (!) {// if (!); tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); return false; } // If tryAcquire(arg) returns false, the code will execute: // acquireQueued(addWaiter(node.exclusive), arg), addWaiter(Node.EXCLUSIVE) /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for EXCLUSIVE, node. SHARED for SHARED * @return the new Node */ / // the mode argument is node.exclusive, Private Node addWaiter(Node mode) {Node Node = new Node(thread.currentThread (), mode); // Try the fast path of enq; // The following lines of code want to add the current node to the bottom of the list, i.e. the last node in the blocking queue. // tail! If (tail==head, tail==head) if (pred! = null) {// Set node.prev = pred; If (compareAndSetTail(pred, node)) {// If (compareAndSetTail(pred, node)) { Pred. Next = node; pred. Next = node; // The thread is queued and can return the return node; }} pred==null(queue empty) or CAS failed (thread competing to join queue) Enq (node) that would otherwise waste time; return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node'S predecessor */ / Adopts the mode of rotation to join the queue // As previously said, there are only two possibilities to this method: waiting queue is empty, or there are threads competing to join the queue, // Spins on this side of the semantics are: Private Node ENq (final Node Node) {for(;;) { Node t = tail; As mentioned earlier, an empty queue also comes in hereif(t == null) {// head Must initialize // tail Must initialize // head Must initialize // tail Must initialize //if(compareAndSetHead(new Node())waitStatus==0, new Node() constructor ==0, new Node() constructor =0, new Node() constructor ==0 We just set tail=head, not tail=headreturnOh, noreturn, there is noreturn// So, after setting up, continueforLoop. Next time we'll go to the bottomelseBranch tail = head; }else{// the following lines are the same as the previous method addWaiter, except that this is in an infinite loop where the current thread is queued to the end of the queue. Node.prev = t;if (compareAndSetTail(t, node)) {                    t.next = node;                    returnt; }}}} // Now back to this code //if(! tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); If (addWaiter(Node.exclusive), arg) acquireQueued(addWaiter(Node.exclusive), arg)trueThis means that the code above will enter selfInterrupt(), so normally this should returnfalseFinal Boolean acquireQueued(final Node Node, int arg) {Boolean failed = final Boolean acquireQueued(final Node Node, int arg) {Boolean failed =true;        try {            boolean interrupted = false;            for(;;) { final Node p = node.predecessor(); // p == head; // p == head; // P == head; // So the current node can try to grab the lock // here we explain why we can try: // This is the first condition. Second, it is possible that the current head is the newly initialized node. As mentioned in the enq(node) method, the head is delayed and no thread is set when new node () is created. The current head does not belong to any thread, so as the head of the queue, you can try it, // tryAcquire has already analyzed, please look ahead, just use CAS to try stateif (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;                    returninterrupted; } // Go to this point to illustrate the aboveifThe branch failed, either the current node is not the team leader, // or tryAcquire(ARG) did not beat the others, continue to readif (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true; }} finally {// When failedtrue????? // The tryAcquire() method throws exceptionsif (failed)                cancelAcquire(node);        }    }    /**     * Checks and updates status for a node that failed to acquire.     * Returns true if thread should block. This is the main signal     * control in all acquire loops.  Requires that pred == node.prev     *     * @param pred nodeThe predecessor holding status * @param node the node * @return {@code true} if thread should block */ / This method says: "The current thread did not grab the lock. Do we need to suspend the current thread?" // The first parameter is the precursor node, The second parameter is the representative of the current thread Node private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) {int ws = Mr Pred. WaitStatus; // If waitStatus == -1, the status of the precursor node is normal and the current thread needs to suspend. If (ws == node. SIGNAL) /* * This Node has already set status asking a release * to SIGNAL it, so it can safely park. */ return true; // The precursor waitStatus is greater than 0, which means that the precursor is unqueued. // The thread that enters the blocking queue is suspended, and the wake up is done by the precursor node. // Set the prev of the current node to a node whose waitStatus<=0; // Simply to find a good parent, because you still have to rely on it to wake up. If (ws > 0) {* * Predecessor was cancelled. Skip over boomand * indicate retry. */ do {* * Predecessor was cancelled over boomand * indicate retry. node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don'T park yet. Caller will need to * retry to make sure it cannot acquire before parking. What does it mean if you enter this branch // the precursor nodewaitStatus is not equal to -1 or 1, that is, it can only be 0, -2, -3 // in our previous source, there is no settingwaitSo when each new node joins the queue,waitStatu is 0; // In normal cases, tail is the previous node, then its tailwaitStatus should be 0 // Use CAS to place the precursor nodewaitCompareAndSetWaitStatus (pred, ws, node. SIGNAL); } // This method returnsfalse"Then it will go againforStep by step, // then enter the method again, which will return from the first branchtrue        return false; } / / private static Boolean shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) / / the end of this method according to the return value we simple analysis: / / if it returnstrue, indicating that of the precursor nodewaitStatus==-1, which is normal, then the current thread needs to be suspended, waiting to be woken up laterfalse"Does not currently need to be suspended. Why? Look back // jump back to the front is this method //if (shouldParkAfterFailedAcquire(p, node) &&    //                parkAndCheckInterrupt())    //                interrupted = true; / / 1. If shouldParkAfterFailedAcquire (p, node) to returntrue, // Then parkAndCheckInterrupt(): // This method is easy because the previous returntrue// Locksupport. park(this) suspends the thread and then waits to be woken up ======= private final BooleanparkAndCheckInterrupt() {        LockSupport.park(this);        returnThread.interrupted(); } / / 2. Then tell me if shouldParkAfterFailedAcquire (p, node) to returnfalse/ / see shouldParkAfterFailedAcquire (p, node), we can find that actually came in for the first time, generally will not returntrueThe reason is very simple, the precursor nodewaitStatus=-1 depends on the setting of subsequent nodes. In other words, I haven't even set the front drive to -1. How could it betrueBut you see, this method is stuck in the loop, so the second time it comes in it's going to be negative 1. / / explain why shouldParkAfterFailedAcquire (p, node) returnfalse// => in case node is a direct descendant of head after this method. Figure it out for the rest of you. }Copy the code

Final Boolean acquireQueued(final Node Node, int arg) acquireQueued(final Node Node, int arg) Figure out for yourself how each branch goes, what happens in which case, and where it goes.

Unlock operation

Finally, we need to introduce the action of awakening. We know that normally, if the thread does not acquire the lock, the thread will be locksupport-park (this); Suspend to stop and wait to be awakened.

Public void * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *unlock() { sync.release(1); }public final Boolean release(int arg) {public final Boolean release(int arg)if (tryRelease(arg)) {        Node h = head;        if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h);return true;    }    return false; }// Go back to ReentrantLock and look at the tryRelease method protected Final Boolean tryRelease(int releases) {int c = getState() -releases;if(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // Whether to release the lock completely Boolean free =false; If c==0, no nested locks can be released. If c==0, no nested locks can be releasedif (c == 0) {        free = true;        setExclusiveOwnerThread(null);    }    setState(c);    returnfree; }/** * Wakes up nodeSucceeded, if one exists. * * @param node the node *// Know from the preceding call, Private void unparksucceeded (node node) {/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; If (ws <0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled nodes. */ / S = node.next; s= node.next; s= node.next; if (s == null || s.waitStatus > 0) { s = null; For (Node t = tail; waitStatus==1; waitStatus==1; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) // Wake up thread locksupport. unpark(s.read); }Copy the code

After waking up the thread, the awakened thread will proceed from the following code:

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // The thread was suspended herereturnThread.interrupted(); AcquireQueued (final Node Node, int arg) acquireQueued(final Node Node, int argCopy the code

Well, after the analysis of the source code, the rest of the problem to take a closer look at the code.

conclusion

So to sum up.

In a concurrent environment, locking and unlocking require the coordination of the following three components:

  1. The lock state. We need to know if the lock is being held by another thread, and that’s what state is for. When it’s 0, it means that no thread is holding the lock, so we can try to grab the lock. You add +1 to state, you subtract 1 to unlock, until state goes to 0 again, so lock() and unlock() have to pair. It then wakes up the first thread in the wait queue to claim the lock.

  2. Blocking and unblocking of threads. AQS uses locksupport. park(Thread) to suspend threads and unpark to wake them up.

  3. Block the queue. A queue is needed to manage these threads. AQS uses a FIFO queue, which is a linked list. Each node holds a reference to its successor node. AQS uses a variant of CLH lock to implement, interested readers can refer to this article on the INTRODUCTION of CLH, written simply.

Sample graph parsing

This is a review session, a simple example, and a chance to help you understand some of the things above.

The first thread calls reentrantLock.lock(). If you scroll to the front, tryAcquire(1) will return true. With state=1, there is no initialization of the head, let alone any blocking queue. If thread 1 calls unlock() and thread 2 comes in, then the world is completely peaceful, there is no intersection, so WHY do I need AQS?

Imagine what happens if thread 2 calls lock() before thread 1 calls unlock().

Thread 2 will initialize head [new Node()], and thread 2 will insert the blocking queue and suspend.

Sample graph parsing

This is a review session, a simple example, and a chance to help you understand some of the things above.

The first thread calls reentrantLock.lock(). If you scroll to the front, tryAcquire(1) will return true. With state=1, there is no initialization of the head, let alone any blocking queue. If thread 1 calls unlock() and thread 2 comes in, then the world is completely peaceful, there is no intersection, so WHY do I need AQS?

Imagine what happens if thread 2 calls lock() before thread 1 calls unlock().

Thread 2 will initialize head [new Node()], and thread 2 will insert the blocking queue and suspend.

First, thread 2 initializes the head node with head==tail and waitStatus==0



Then thread 2 joins the queue:

We also need to look at the waitStatus of the node at this time. We know that the head node is initialized by thread 2, the waitStatus is not set at this time, Java default is set to 0, But by this method, shouldParkAfterFailedAcquire thread 2 take precursor nodes, namely the head waitStatus is set to 1.
What is the waitStatus of thread 2? It’s 0 because it’s not set;
If the thread 3 to come in at this time, directly into the thread 2 back, at this time the thread 3 waitStatus is 0, the shouldParkAfterFailedAcquire method when the thread 2 waitStatus precursor node is set to 1.

The SIGNAL(-1) state in waitStatus means that the successor node needs to be woken up. In other words, this waitStatus actually represents the status of its successor node rather than its own state. As we know, when each node joins the queue, it changes the status of its predecessor node to SIGNAL, and then blocks, waiting to be woken up by its predecessor. There are two problems involved here: a thread unqueued and woke up. The essence is the same, but the reader can also look at the source code along the lines of “waitStatus represents the state of the successor node.”

(Full text)

Wechat search [ape lighthouse] 365 original plan for free [factory interview questions] [technical dry goods] [resume template]