Through AQS exclusive lock, we have a basic understanding of AQS data structure. It is essentially an optimized CLH queue, because A CLF queue has only one precursor pointer, whereas AQS has a backprecursor pointer in addition to the precursor pointer. First, briefly summarize the characteristics of AQS

  1. Bidirectional linked list, head node and tail node, FIFO, tail in and head out, each thread will be encapsulated into oneNode
  2. statestate,volatileKeyword modification
  3. In exclusive mode, the state value is increased by 1 after the lock is acquired, and the state value is decreased by 1 after the lock is released.state==0Tables can acquire locks,state>1Stands for lock reentrant
  4. In shared mode,state>0When the synchronizer initializes, an initialization is set to sate. This value represents how many threads are allowed to acquire the lock
  5. In shared mode,tryAcquireSharedThe return value has the following characteristics:Less than zeroFailed to obtain the lock.Equal to zeroIt means that the lock is successfully acquired this time, but the subsequent lock will return to failure, that is, this is the last lock in shared mode, unless someone releases the lock next, you can not obtain it;Greater than zeroRepresents that the lock is acquired this time, and that the lock can be acquired later
  6. There is one inside each NodenextWaiterProperty indicating whether the node is in exclusive mode or shared mode. Exclusive isEXCLUSIVEExclusive forSHARED
  7. There is one inside each NodewaitStatusProperty, this field has the following possible values:SIGNAL (block) CANCELLED(the queue) CONDITION(CONDITION waiting) PROPAGATE(used in shared mode) 0(defaults to 0 if no state is set to it)
  8. In exclusive mode, when the lock is acquired successfully, the node corresponding to the thread (the next node of the head) is upgraded to the head node, and the original head is removed from the queuehelp GC; When the lock fails to be acquired, a node is added to the end of the CLH queue, and its predecessor is driven by spinwaitStatusProperty set toSIGNALAnd then throughLockSupportBlocks the thread corresponding to this node
  9. In exclusive mode, when the lock is released, the next node of the head node is obtained first. If the value is not null, the lock passesLockSupportUnblock the thread corresponding to the node; If it is null, it passesPrecursor pointerReverse traverse to find the node and passLockSupportUnblock the thread corresponding to this node. Why traverse through a precursor pointer? Because the rear drive pointer of AQS is unreliable in extreme cases, optimization effect can be achieved in many cases through the rear pointer: when adding nodes, when CAS is successful but before setting the back pointer, the back pointer is null; Set for the precursor nodeSIGNALWhen is in the state, it will ensure that its precursor node is a valid node (non-cancelled state). If it is in the cancelled state, it will find the precursor of its precursor
  10. In shared mode, this is called when a thread acquires a locksetHeadAndPropagateMethod, if there are still locks available in the synchronizer at this pointdoReleaseSharedMethod to wake up the next node, which is propagation
  11. In shared mode, this is called when a thread releases the lockdoReleaseSharedMethod, which wakes up the next node of the head node, and the awakened node is called after it has acquired the lock through spinsetHeadAndPropagateMethod, if there are still locks available in the synchronizer at this pointdoReleaseSharedMethod to wake up the next node

The following uses CountDownLatch to explain AQS shared locks

Use the sample

CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 80);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 50);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 60);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();

countDownLatch.await();
System.out.println("All Finished"); } -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - Thread 1 Finished Thread-2 Finished Thread-0 Finished All FinishedCopy the code

CountDownLatch can be used, for example, to divide a task into three smaller tasks and wait for all tasks to complete on the main thread. Add 3 to the constructor. In AQS shared mode, state == 3 means three threads can acquire the lock at the same time. After CountDownLatch means three threads can acquire the lock at the same time, The thread calling countdownlatch.await () will no longer block. In the example above, CountDownLatch#countDown is called for each small task and CountDownLatch#countDown is called on the main thread to achieve the desired effect.

CountDownLatch

CountDownLatch countDownLatch = new CountDownLatch(3);

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}
Copy the code
  1. Similar to other synchronizers, inCountDownLatchThere is an inner class inSyncIt inheritsAbstractQueuedSynchronizerAbstract class, which is the key to implementing synchronizer
  2. inCountDownLatchIs called based on the count value we passed inAbstractQueuedSynchronizer#setStateMethod, the final AQSstate == count

CountDownLatch#await

// CountDownLatch#await
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
    return(getState() == 0) ? 1:1; }Copy the code

As with the previous exclusive mode, this is template mode, meaning that to implement a synchronizer based on a shared lock, you just need to override the tryAcquireShared and tryReleaseShared methods.

  1. According to theacquireSharedInterruptiblyAs you can see from the method name, this is a method that responds to interrupts and is thrown if the thread interruptsInterruptedException
  2. tryAcquireSharedThe main function of method is whenReturn 1 if state==0, otherwise -1We knowCountDownLatchAfter the constructor completes, the value of state in AQS is 3. When can the value of state be 0? It’s not hard to guess that it should be calledCountDownLatch#countDownMethod will change the value of state, which we’ll verify next. That is, in the absence of anything else, at this pointtryAcquireSharedMethod returns- 1
  3. acquireSharedInterruptiblyThe if condition in the method is true, so it will be executeddoAcquireSharedInterruptiblymethods

AbstractQueuedSynchronizer#doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly (int arg) throws InterruptedException {/ / create the SHARED Node, Node. Note here SHARED, Then add it to the end of the queue final Node Node = addWaiter(node.shared); boolean failed =true; Try {// spin the lockfor(;;) Final Node p = node.predecessor(); // If the node is head, the lock is acquired directlyif(p == head) {// Try to get the lock int r = tryAcquireShared(arg);if(r >= 0) {// Try to obtain lock successfully, need to reset the set head node, this also has propagation operation, we will focus on latersetHeadAndPropagate(node, r); // remove the previous head node from the queue. //help GC
                    failed = false;
                    return; }} // Set the precursor node to SIGNAL and block the current threadif (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if(failed) cancelAcquire(node); }}Copy the code
  1. The same as the exclusive mode: Create a node and add it to the tail, and change the status of the precursor node to when the lock fails to be obtainedSIGNALAnd blocks the current thread
  2. The same as the exclusive mode: in the exclusive mode, if the thread is interrupted, only an identifier bit is returned. In this case, an exception is directly thrown. In exclusive mode; In exclusive mode, if the lock is acquired successfully, the head node is updated and returned, whereas in shared mode, the lock is calledsetHeadAndPropagatemethods

So, what’s the propagate method for? It’s worth taking a closer look

AbstractQueuedSynchronizer#setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head forCheck below // Sets the current node to head, the same as in exclusive modesetHead(node);

    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; If (s = = null | | s.i sShared ()) / / wake up the next node, doReleaseShared (); }}Copy the code

At first glance, this code is a bit confusing, except for the initial setHead, which, like exclusive mode, sets the current node to head. In fact, we have to understand it from a feature of the AQS shared model.

  1. tryAcquireSharedReturn value features:Less than zeroFailed to obtain the lock.Equal to zeroIt means that the lock is successfully acquired this time, but the subsequent lock will return to failure, that is, this is the last lock in shared mode, unless someone releases the lock next, you can not obtain it;Greater than zeroRepresents that the lock is acquired this time, and that the lock can be acquired later
  2. propagatenamelytryAcquireSharedMethod return value, ifpropagate>0, it indicates that locks can be obtained in shared mode. In this case, if there are queued nodes in the queue, they should be notified, which is propagation. How do you do that? calldoReleaseSharedmethods
  3. thehead==null || head.waitStatus<0What are the corresponding scenes? And we’ll talk about that

There is something about the CountDownLatch#await method, we can leave it here, but there are still some questions, so let’s write it down

  1. head==null || head.waitStatus<0What scenario does it correspond to?
  2. Why judge twice? namely(h = head) == null || h.waitStatus < 0)
  3. node.next == null || node.next.isShared()What does that mean?

CountDownLatch#countDown

The CountDownLatch#countDown method should change the value of state, again in template mode

// CountDownLatch#countDown
public void countDown() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// Sync#tryReleaseShared
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            returnnextc == 0; }}Copy the code
  1. tryReleaseSharedIndicates an attempt to obtain a lock. If the lock is successfully obtained, a notification is requiredThe back-end node of the head nodeAnd unblock the rear-drive node.Sync#tryReleaseSharedMethod to determine whether state is equal to 0, if so, return false, and setstate = state-1And then returns whether state is equal to. Why is there such a logic? It actually has to do with the characteristics of the synchronizerCountDownLatchWhen 3 is passed in the constructor of, it means we are calling on three threadsCountDownLatch#countDownMethod after the callCountDownLatch#awaitThe thread will unblock. That’s three, and a few more calls doesn’t do anything, so I’m just going to return false, okay
  2. Called in 3 threadsCountDownLatch#countDownMethods after,AbstractQueuedSynchronizer#releaseSharedThe if condition in theAbstractQueuedSynchronizer#doReleaseSharedMethod, which means to unblock the back-end node of the head, involves some propagation logic

AbstractQueuedSynchronizer#doReleaseShared

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if(h ! = null && h ! = tail) { int ws = h.waitStatus; // If head.waitStatus == Node.SIGNAL, its backdrive is blocked and will change the status of the front drive when adding a Nodeif (ws == Node.SIGNAL) {
                if(! compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // If head.waitstatus == node. SIGNAL wake up the head.next Node, and the same unparksucceeded (h); }else if(ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

It’s a little bit better to look at this with the Propagate method, so I’ll post the code as well

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared(); }}Copy the code
Let’s start the simulation executiondoReleaseSharedLimit case of the method

(1). Suppose there are two nodes in the queue A->B->C->D A:head D:tail

(2). Threa1 represents the thread executing doReleaseShared, thread2 represents the node B thread, thread3 represents the node C thread, and thread4 represents the node D thread. Thread2 thread3 Thread4 is blocked

  1. Thread1 enters the loop for the first time. = null && h ! The unparksucceeded method is executed to wake thread2. After the unparksucceeded method is executed, thread2 is awakened. Then thread1 and thread2 are running at the same time, at this moment the state of head is 0

  2. Thread1 continues with head.waitStatus ==0, but ws==SIGNAL before compareAndSetWaitStatus(h, node.signal, 0). So else if (ws == 0 &&! CompareAndSetWaitStatus (h, 0, PROPAGATE)) is not valid, then check whether h == head is valid

    2.1 If thread2 has not acquired the lock successfully, then h == head is established, and thread1 exits directly. Then thread2 only needs to acquire the lock successfully through its spin. If the synchronizer can still get the lock, now only one node B is woken up, thread threa1 directly exits, how to wake up the later C and D nodes, the thread thread threa1

    2.2 If thread2 successfully obtains the lock, that is, it has successfully upgraded to head, then h == head is not valid and thread1 enters the next cycle. Thread2 also executes the setHeadAndPropagate method, or doReleaseShared if the synchronist has a lock, so it is possible for both threads to execute doReleaseShared at the same time

  3. Thread1 and thread2 execute doReleaseShared at the same time, because the new head is Node B and the state is SIGNAL, compareAndSetWaitStatus(h, node. SIGNAL, 0) only one thread can execute successfully

    3.1 If thread1 is successfully executed, the next node, namely node C, will be woken up. If node C determines that there is still a lock in the synchronizer after the head upgrade succeeds, thread3 of node C will continue to wake up the next node, so there may be 3 threads executing the doReleaseShared method at the same time

    If (h == head) {if (h == head) {if (h == head); Thread1, thread2, thread3 may be competing to execute compareAndSetWaitStatus(h, node.signal, 0) at the same time

  4. If all four nodes A, B, C, and D are awakened, there is only one head left in the queue, that is, node D. Of course, the threads corresponding to nodes A, B, C, and D might all be executing doReleaseShared, but it doesn’t matter because the new loop condition if (h! = null && h ! = tail) &&h == head causes them to exit

  5. But consider the case where three threads exit normally, and then execute if (h! = null && h ! Thread4 enters the loop again when a new node (E) is added to the queue, and the head(D) is in one of two states

    5.1 Node E has been added to the tail successfully, and the state of its precursor node (node D) has been changed to SIGNAL, i.e., head(node D) == SIGNAL, then thread4 will wake up node E, and the following process will be the same as above, without analysis

    Else if (ws == 0 &&!) else if (ws == 0 &&!) CompareAndSetWaitStatus (h, 0, PROPAGATE)

  6. ws == 0 && ! In what case does compareAndSetWaitStatus(h, 0, PROPAGATE) not hold?

    6.1 When multiple threads execute the doReleaseShared, three threads are added to execute the doReleaseShared, the first thread succeeded in executing the unparksucceeded method, Else if (ws == 0 &&! CompareAndSetWaitStatus (h, 0, PROPAGATE)), we do not consider this case

    6.2 that is, ws == 0 and! CompareAndSetWaitStatus (h, 0, PROPAGATE)) the state of the head Node is changed between the two lines. As we know, when adding nodes, the state of the precursor node will be changed to SIGNAL, so setting the head state corresponding to the thread spin of node E may cause this to be invalid

Although there are only a few lines of code, but in the case of not understanding the author’s intention, it is really difficult to understand, to make a summary

  1. The first thing to understandA Shared lockThe meaning of,A Shared lockRepresents that multiple threads can acquire the lock at the same time, and the specific number of threads is determined by the user; whileAn exclusive lockRepresents that only one thread can acquire the lock at a time
  2. Given that multiple threads can acquire locks at the same time, how do you wake up other blocked nodes as quickly as possible? This involves the propagation of shared locks
  3. Called when a thread acquires a locksetHeadAndPropagateMethod, if there are still locks available in the synchronizer at this pointdoReleaseSharedMethod to wake up the next node, which is propagation
  4. Called when a thread releases the lockdoReleaseSharedMethod, which wakes up the next node of the head node, and the awakened node is called after it has acquired the lock through spinsetHeadAndPropagateMethod, if there are still locks available in the synchronizer at this pointdoReleaseSharedMethod to wake up the next node
  5. It’s a bit of a mutual call, all to make sure there’s still a lock available in the synchronizer so that the blocking thread can get it as quickly as possible