1. Talk about something else first

To be honest, I had planned to write a technical article about the design concept, implementation and use of AQS, but after writing the first draft, I found my grasp was still vague and ambiguous. Let go of the pain and start all over again. This time with Java 8 source code as the basis for interpretation.

2. Introduction to AQS

In Java. Util. Concurrent. The locks package, there are two such classes:

  • AbstractQueuedSynchronizer
  • AbstractQueuedLongSynchronizer

The only difference between these two classes is:

  • AbstractQueuedSynchronizer internal maintenancestateA variable isinttype
  • AbstractQueuedLongSynchronizer internal maintenancestateA variable islongtype

What we often call AQS refers to these two classes, abstract queue synchronizers.

Abstract queue synchronizer AbstractQueuedSynchronizer (hereinafter referred to as “AQS), is used to construct the lock or other skeleton of synchronous components, reduce the amount of code, the functional component implementations are solved in the realization of synchronizer involves a lot of the details, such as waiting thread using FIFO queue operations order. There are also flexible criteria that can be defined in different synchronizers to determine whether a thread should pass or wait.

AQS adopts the template method pattern, on the basis of internally maintaining n more template methods, subclasses only need to implement specific methods (not abstract methods! Not abstract methods! Not abstract methods! , you can implement the subclass’s own requirements.

Aqs-based components such as:

  • ReentrantLock ReentrantLock (fair and unfair lock acquisition)
  • Semaphore counts semaphores
  • ReentrantReadWriteLock read-write lock

AQS is one of Doug Lea’s great books, looking him up on Wikipedia, and accidentally discovering that pops likes red or light pink shirts?

3. Design ideas of AQS

AQS internally maintains an INT member variable to indicate the synchronization status, and controls the thread acquiring the shared resource through the built-in FIRST-in-first-out (FIFO) synchronization queue.

We can guess that AQS does a few things:

  • Maintenance management of synchronization state
  • Maintenance and management of wait queues
  • Thread blocking and wake up

ConditionObject, of course, maintains a ConditionObject inner class, which is used for collaboration and communication between threads.

The int state maintained internally by AQS can be used to indicate any state!

  • ReentrantLock is used to indicate the number of times that the lock holder thread has repeatedly acquired the lock. For non-lock holders, if state is greater than 0, it means that the lock cannot be acquired, and the thread is wrapped as Node and added to the synchronous wait queue.
  • Semaphore uses this to indicate the number of remaining licenses. When the number of licenses is zero, threads that have not received licenses but are trying to obtain them will enter a synchronous wait queue, block until some threads release their licenses (state+1), and then compete for the freed licenses.
  • FutureTask uses this to indicate the status of a task (not started, running, completed, cancelled).
  • When used by ReentrantReadWriteLock, state is a bit different. The first 16 bits of state are read locks, and the second 16 bits are write locks.
  • CountDownLatch uses state to indicate the number of counts. If state is greater than 0, it indicates that it needs to be added to the synchronous wait queue and blocked until state is equal to 0, the threads in the wait queue will be awakened one by one.

3.1 Pseudo-code lock acquisition:

boolean acquire(a) throws InterruptedException {

  while(Current state does not allow fetch operation) {

    if(need to block fetch request) {

If the current thread is not in the queue, it is inserted into the queue

Blocking the current thread

    }

    else

Returns the failure

  }

It is possible to update the state of the synchronizer

If a thread is in a queue, it is removed from the queue

Return to success

}

Copy the code

3.2 Pseudo-code lock release:

void release(a) {

Updates the status of the synchronizer

  if(The new state allows a blocked thread to succeed.)

Unblock one or more threads in a queue

}

Copy the code

This is probably the idea.

3.3 Methods provided

3.3.1 Common method

The following three methods, all protected final, can be called by any class that inherits AQS.

  • Protected final int getState() gets the synchronization state
  • Protected final void setState(int newState) Sets the synchronization state
  • Protected Final Boolean compareAndSetState(int expect, int update) If the current state value is equal to the expected value, atomically set the synchronization state to the given update value and return true; Otherwise return false
3.3.2 Methods that subclasses need to implement

The following five methods are not implemented within AQS, but are handed over to subclasses to implement, and then AQS calls the implementation methods of subclasses to complete the logical processing.

  • Protected Boolean tryAcquire(int) If tryAcquire(int) attempts to acquire an operation in exclusive mode, query the state of the object to see if it is allowed to acquire it in exclusive mode, and obtain it if so.
  • Protected Boolean tryRelease(int) Attempts to release the synchronization state
  • Protected int tryAcquireShared(int) shared to try to get the action
  • Protected Boolean tryReleaseShared(int) Try to release the shared mode
  • Protected Boolean isHeldExclusively() specifies whether the thread calling this method is the holder of an exclusive lock

Subclasses do not need to implement all of the above methods. They can override some of them, but keep the implementation logic intact. According to different implementation methods, it can be divided into exclusive lock policy implementation and shared lock policy implementation.

This is why the above methods are not defined as abstract methods. If defined as abstract methods, subclasses must implement all five methods, even if you don’t use them at all.

Exclusive locks:

  • ReentrantLock
  • ReentrantReadWriteLock. WriteLock implementation strategy:
  • tryAcquire(int)
  • tryRelease(int)
  • isHeldExclusively()

A Shared lock:

  • CountDownLatch
  • ReentrantReadWriteLock.ReadLock
  • Semaphore implementation strategy:
  • tryAcquireShared(int)
  • tryReleaseShared(int)

AQS has a lot of internal template methods, not an example, after the source code interpretation, will show a part of, and will be with SAO gas notes.

4. Internal attributes of AQS

4.1 the CLH queue

AQS controls the threads that acquire shared resources through a built-in FIRST-in-first-out (FIFO) synchronization queue. CLH queue is a bi-directional queue of FIFO, and the synchronization mechanism of AQS is based on this CLH queue. For each node in the queue, there are Pointers to the precursor node and Pointers to the successor node.

The head is not in the blocking queue!

AQS-Node.jpg

The Node source code:

static final class Node {

    // Wait for tags in shared mode

    static final Node SHARED = new Node();



    // Wait for tags in exclusive mode

    static final Node EXCLUSIVE = null;



    // Indicates that the current thread is cancelled

    static final int CANCELLED = 1;



    // Indicates that the current node's successors contain threads that need to be run, i.e., unpark

    static final int SIGNAL = -1;



    // Indicates that the current node is waiting on condition, i.e. in the condition queue

    static final int CONDITION = -2;



    // Indicates that subsequent acquireShared in the current scene can be executed

    static final int PROPAGATE = -3;

    / * *

* CANCELLED = 1 // Current thread CANCELLED due to timeout or interruption. This is a terminal state, which means we're done here.

* SIGNAL = -1 // Indicates that the successor thread of the current thread is blocked or about to be blocked and needs to wake up after the current thread releases or cancels the lock. This state is usually used by the successor node to set the precursor node

CONDITION = -2 // indicates that the current thread is in the CONDITION queue

PROPAGATE = -3 // For passing on the wake up successor thread, this state is introduced to perfect and enhance the wake up mechanism for shared locks

* 0 // indicates no state or terminal state!

* /


    volatile int waitStatus;



    // The precursor node

    volatile Node prev;



    // Subsequent nodes

    volatile Node next;



    // The thread of the current node, initialized to use, deactivated after use

    volatile Thread thread;



    // Stores successor nodes in the condition queue

    Node nextWaiter;



    // Returns true if the node is waiting in shared mode

    final boolean isShared(a) {

        return nextWaiter == SHARED;

    }

    // Return the precursor node of the current node, if null, throw a null pointer exception

    final Node predecessor(a) throws NullPointerException {

        Node p = prev;

        if (p == null)

            throw new NullPointerException();

        else

            return p;

    }



    Node() {    // Used to establish initial head or SHARED marker

    }



    // Specify thread and pattern constructors

    Node(Thread thread, Node mode) {     // Used by addWaiter

        // SHARED and EXCLUSIVE are used to indicate whether a node is SHARED or EXCLUSIVE

        this.nextWaiter = mode;

        this.thread = thread;

    }



    // Specify thread and node state constructors

    Node(Thread thread, int waitStatus) { // Used by Condition

        this.waitStatus = waitStatus;

        this.thread = thread;

    }

}

Copy the code

4.2 volatile state

Most important property, this integer can be used to represent any state! It says that up there.

4.2 volatile head & volatile tail

The head node is a virtual node that logically represents the thread node that holds the lock. The head node does not store thread information or precursor node information.

Tail Tail node, where each new node goes to the end of the queue. No information about successor nodes is stored.

  • These two attributes are lazily initialized. When the first thread holds the lock and the second thread fails to acquire the lock, the head and tail will be initialized. That is, when all threads can acquire the lock, the internal head and tail are null. Even if no thread later holds the lock, its internal head and tail still hold the last thread node to hold the lock! (Head and tail both point to a memory address)
  • When a thread fails to acquire the lock and is added to the synchronization queue, CAS is used to set tail to the Node corresponding to the current thread.
  • The Unsafe class was removed for cas operations within AQS, and since Java9, the Unsafe class has been replaced by the VarHandle class.

Both of these attributes are volatile (which guarantees order and visibility)

4.3 spinForTimeoutThreshold

The spin timeout threshold is used in methods like doAcquireSharedNanos().

  • If the user-defined wait time exceeds this threshold, the thread will block, returning true if it can wait for a wake up opportunity and tryAcquireShared succeeds during the block, false otherwise, and false if it timed out.
  • If the user-defined wait time is less than or equal to this threshold, the loop is infinite and the thread does not block until a thread releases synchronization or times out, and then returns the corresponding result.

4.4 exclusiveOwnerThread

This is AQS through inheritance AbstractOwnableSynchronizer class, obtain the properties of the synchronizer holders said monopolistic mode.

5. Specific implementation of AQS

5.1 Implementation of exclusive Lock

5.1.1 Obtaining the lock reentrantLock. lock()
/ * *

* Get exclusive lock, ignore interrupt.

* First attempt to acquire the lock, return true if successful; Otherwise, the current thread is wrapped as a Node and inserted at the end of the queue. In the queue, the thread is checked to see if it is the direct successor of the head and attempts to acquire the lock.

* If the lock fails, the current thread is blocked through LockSupport until the thread that released the lock wakes up or is interrupted, then attempts to acquire the lock again, and so on. Wake up and resume the previous code execution

* /


public final void acquire(int arg) {

    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

        selfInterrupt();

}

---------------------------------------------------------------------------------------

The tryAcquire() method needs to be implemented by subclasses, and ReentrantLock implements fair and unfair locking by overwriting this method

---------------------------------------------------------------------------------------



/ * *

* Insert node in synchronous wait queue

* /


private Node addWaiter(Node mode) {

    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;

    // Check whether the tail node is null

    if(pred ! =null) {

        node.prev = pred;

        // Insert the current node at the end of the queue through CAS

        if (compareAndSetTail(pred, node)) {

            pred.next = node;

            return node;

        }

    }

    // If the tail node is null, the new node is inserted at the end of the queue and initialized if necessary

    enq(node);

    return node;

}



/ * *

* Returns after successfully inserting a node into the queue through an infinite loop and CAS operation.

* Insert nodes into queues and initialize them if necessary

* /


private Node enq(final Node node) {

    for (;;) {

        Node t = tail;

        // Initialize head and tail

        if (t == null) {

            if (compareAndSetHead(new Node()))

                tail = head;

        } else {

            node.prev = t;

            / *

CAS Set tail to node

Connect next from tail to node.

If the synchronization queue head and tail nodes have just been initialized by this thread, the next of the head is actually connected to Node, and the old tail node is outlawed by Node.

Head <-> old_tail <-> tail <-> tail

* /


            if (compareAndSetTail(t, node)) {

                t.next = node;

                return t;

            }

        }

    }

}



/ * *

* Nodes in the queue acquire locks by this method, ignoring interrupts.

* This method is very important. If the lock is not obtained above, wrap the thread as a Node to the end of the synchronization queue, and then read the comments in the code

* /


final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();

            / *

* Check whether the current node precursor is head, which is to try to acquire the lock.

* If so, tryAcquire is called to try to acquire the lock,

* If successful, head is set to the current node. Next of the original head node is set to null for GC garbage collection

* /


            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null// help GC

                failed = false;

                return interrupted;

            }

            / *

* If the lock is not successfully acquired, it is determined whether to block based on the precursor node.

* Set the interrupt flag to true if the blocking process is interrupted.

* shouldParkAfterFailedAcquire method under the condition of the precursor state not for SIGNAL cycle retry acquiring a lock.

* if shouldParkAfterFailedAcquire returns true, then the current thread will be blocked and check whether is interrupted

* /


            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}



/ * *

* Determine whether the current thread needs to be blocked based on waitStatus in the precursor node.

* /


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)

        / *

* The precursor node is set to SIGNAL, which wakes up the successor node when the lock is released.

* So the successor node (that is, the current node) can now block itself.

* /


        return true;

    if (ws > 0) {

        / *

* The precursor node status is cancelled, traverses forward, updates the precursor of the current node to the previous non-cancelled node.

* The current thread will then return to the loop again and try to acquire the lock.

* /


        do {

            node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

    } else {

         / * *

For PROPAGATE(-3), set the wait state of the precursor to SIGNAL,

* and then goes back to the loop and tries again to get the lock.

* /


        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    return false;

}



/ * *

* This method unlocks a node.

* /


private void cancelAcquire(Node node) {

   if (node == null)

       return;



   node.thread = null;



   // Iterate over and update the node precursor, pointing the node prev to the first non-canceled node in front.

   Node pred = node.prev;

   while (pred.waitStatus > 0)

       node.prev = pred = pred.prev;



   // Record that the pred node is followed by predNext, which will be used later by CAS.

   Node predNext = pred.next;



   CancelAcquire can be skipped by subsequent nodes when cancelAcquire is called

   node.waitStatus = Node.CANCELLED;



   // If the current node is the tail node, set the tail node to the precursor node of the current node

   if (node == tail && compareAndSetTail(node, pred)) {

       compareAndSetNext(pred, predNext, null);

   } else {

       // If the node has a successor node, what you need to do in this case is to combine the pred with the successor non-cancelable node.

       int ws;

       if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {

           Node next = node.next;

           / *

* CAS is used to try to set pred's successor to Node's successor if next is not cancelled

* It does not matter if the if condition is false or the CAS fails. This means that multiple threads may be aborting and one of them will succeed.

* /


           if(next ! =null && next.waitStatus <= 0)

               compareAndSetNext(pred, predNext, next);

       } else {

           unparkSuccessor(node);

       }



       / *

* At the GC level, and setting to NULL has the same effect

* /


       node.next = node; 

   }

}

Copy the code

The process of acquiring an exclusive lock is roughly as follows: Assuming that the lock has been held by thread A for A long enough time, thread B and thread C fail to acquire the lock.

Thread B:

  • 1. Wrap thread B into Node Node (BN for short) and join the synchronous wait queue. At this time, waitStatus of BN =0
  • 2. Set the tail node to BN and connect it to the head node to form a linked list
  • 3. The head node is a virtual node, that is, the thread that holds the lock (but does not contain thread information), and the tail node is BN
  • 4. Thread B enters an “infinite loop”, checks whether the first node is the head node (true) and tries again to acquire the lock (false, failed to acquire the lock)
  • 5, thread B will enter shouldParkAfterFailedAcquire method, inside the method, the BN precursors node (that is, the head node) waitStatus set to 1, this method returns false
  • 6, because it is an infinite loop, so the thread B again into shouldParkAfterFailedAcquire method, as a result of the BN precursors node (that is, the head node) waitStatus to 1, so direct return true
  • Call parkAndCheckInterrupt and the current thread B is blocked waiting to wake up.

Thread C:

  • 1. Wrap thread C into Node Node (CN for short) and join the synchronous waiting queue. At this time, waitStatus of CN =0
  • 2. Set the tail node to CN and connect it to the original tail node (BN node)
  • 3, thread C enters “infinite loop”, check whether the first node is the head node (false)
  • 4, thread C will enter shouldParkAfterFailedAcquire method, inside the method, the CN precursor node node (BN) waitStatus is set to 1, this method returns false
  • 5, because is an infinite loop, so the thread C enter shouldParkAfterFailedAcquire method again, as a result of CN precursor node node (BN) waitStatus to 1, so direct return true
  • Call parkAndCheckInterrupt and thread C is blocked waiting to wake up.

The final queue looks like this:

+------+        +------+        +------+

| |  <---  | |  <---  | |

| head |        | BN |        | tail |

| AN |  --->  | |  --->  | (CN) |

+------+        +------+        +------+

Copy the code
5.1.2 Unlocking ReentrantLock.unlock()

To release an exclusive lock, the tryRelaes(int) method is called. This method is implemented by subclasses. After the lock is fully released, the thread that released the lock wakes up its successor, which contests the lock (unfair lock).

public final boolean release(int arg) {

    if (tryRelease(arg)) {

        Node h = head;

        // The header is not null and subsequent nodes need to be woken up

        if(h ! =null&& h.waitStatus ! =0)

            unparkSuccessor(h);

        return true;

    }

    return false;

}

Copy the code

The execution of releasing an exclusive lock is roughly as follows (assuming there are successor nodes to wake up) :

  • Add the head nodewaitStatusSet to zero
  • Wake up the successor node
  • When the thread of the successor node is awakened, the successor node is set to head and the prev and Thread properties within the successor node are set to NULL
  • Set the next pointer to null for the original head node and wait for GC to reclaim the original head node.
+------+        +------+        +------+

| old |  <-X-  | new |  <---  | |

| head |        | head |        | tail |

| AN |  -X->  | BN |  --->  | (CN) |

+------+        +------+        +------+

Copy the code

As shown above, the AN node (formerly the head node) is waiting to be collected by the GC garbage.

5.2 Shared Lock Implementation Roadmap

5.2.1 acquiring a lock

Unlike acquiring an exclusive lock, the key is that a shared lock can be held by multiple threads.

If AQS is required to implement shared locks, when implementing the tryAcquireShared() method:

  • Return a negative number, indicating that the fetch failed
  • Returns 0, indicating success, but subsequent contended threads will not succeed
  • Returns a positive number, indicating success, indicating that subsequent contended threads may also succeed
public final void acquireShared(int arg) {

    if (tryAcquireShared(arg) < 0)

        doAcquireShared(arg);

}



private void doAcquireShared(int arg) {

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();

            if (p == head) {

                int r = tryAcquireShared(arg);

                // Once the share succeeds, set up a new header and wake up subsequent threads

                if (r >= 0) {

                    setHeadAndPropagate(node, r);

                    p.next = null// help GC

                    if (interrupted)

                        selfInterrupt();

                    failed = false;

                    return;

                }

            }

            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}



/ * *

* This function does two things:

* 1. After obtaining the share lock, set the head node

* 2. Use the state returned by the call to tryAcquireShared and the wait state of the node itself to determine whether to wake up subsequent threads

* /


private void setHeadAndPropagate(Node node, int propagate) {

    // Enclose the current head on the method stack for the following condition check

    Node h = head;

    setHead(node);

    / *

Propagate is the return value for tryAcquireShared, which is one of the criteria for deciding whether to propagate the wake up

* H. waitStatus for SIGNAL or PROPAGATE also depends on the next node share of the node

* /


    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {

        Node s = node.next;

        if (s == null || s.isShared())

            doReleaseShared();

    }

}



/ * *

* This is the core wake up function in the shared lock. The main thing to do is wake up the next thread or set propagation state.

* After the subsequent thread is woken up, it will try to acquire the shared lock. If successful, it will call setHeadAndPropagate again to propagate the wake up.

* This function is used to ensure that a waiting node in the queue can be awakened in case of a acquire and release contention.

* /


private void doReleaseShared(a) {

    / *

* What the following loop does is wake up a successor thread if there are any on the queue;

* If the wait state of the head node is 0,

The node state is set to PROPAGATE although it cannot be unparksucceeded, in order to ensure that the wake can be successfully and solidly passed on.

In this way, the thread that gets the lock can read the PROPAGATE when executing the setHeadAndPropagate, so that the thread that gets the lock can release the subsequent waiting thread.

* /


    for (;;) {

        Node h = head;

        // If there is a successor thread in the queue.

        if(h ! =null&& h ! = tail) {

            int ws = h.waitStatus;

            if (ws == Node.SIGNAL) {

                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))

                    continue;

                unparkSuccessor(h);

            }

            // If the state of the h node is 0, it needs to be set to PROPAGATE for the propagation of the wake up.

            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                continue;

        }

        // Check if h is still head. If not, loop again.

        if (h == head)

            break;

    }

}

Copy the code
5.2.1 releases the lock

DoReleaseShared (int) is used for both releasing and acquiring shared locks

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        // The implementation of doReleaseShared for obtaining shared locks is described above

        doReleaseShared();

        return true;

    }

    return false;

}

Copy the code

I think everyone should be able to understand, but let’s briefly say (manual dog ~) :

In a synchronous wait queue, after waking up the successor node threads that blocked because they failed to acquire the shared lock, the successor node threads in turn wake up their successor nodes! And so on.

Another way to say it?

This may be the case where a write lock causes some of the threads that acquired the read lock to block. When the write lock is released, the subsequent node thread will wake up the subsequent node thread. If the subsequent node is blocked because of the failure to acquire the read lock, the subsequent node thread will wake up the subsequent node thread. Until the read lock is obtained for all nodes or the write lock is obtained for a node.

6, expand

PROPAGATE has to be said

There is a bug about AQS that is really worth checking out

In the shared lock acquisition and release operation, there is one particular waitStatus value that I think is very important to talk about, which is PROPAGATE. The attribute value PROPAGATE means that it is used to pass on the wake up subsequent thread. This state is introduced to perfect and enhance the wake up mechanism of the shared lock.

I have read a lot of articles about AQS before, but there is very little about this state value, even in the book “Java Concurrent Programming Practice”, it was not mentioned. Finally, I saw a blogger who explained this PEOPAGATE state in detail, which also gave me a lot of inspiration.

Yes, when I first looked at the AQS source code, I even ignored the PROPAGATE state value directly. In fact, not just read the source code, easy to ignore the PROPAGATE state values, even Doug Lea he himself, also didn’t realize when development, if not the status value will lead to what kind of consequences, until the bug link above, he just added this state, completely repair the bug.

Code to reproduce this bug:

import java.util.concurrent.Semaphore;



public class TestSemaphore {



    private static Semaphore sem = new Semaphore(0);



    private static class Thread1 extends Thread {

        @Override

        public void run(a) {

            sem.acquireUninterruptibly();

        }

    }



    private static class Thread2 extends Thread {

        @Override

        public void run(a) {

            sem.release();

        }

    }



    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i < 10000000; i++) {

            Thread t1 = new Thread1();

            Thread t2 = new Thread1();

            Thread t3 = new Thread2();

            Thread t4 = new Thread2();

            t1.start();

            t2.start();

            t3.start();

            t4.start();

            t1.join();

            t2.join();

            t3.join();

            t4.join();

            System.out.println(i);

        }

    }

}

Copy the code

An occasional thread hangs while the program is executing.

Let’s look at what the propagate method looks like earlier.

private void setHeadAndPropagate(Node node, int propagate{

    setHead(node);

    if (propagate > 0&& node.waitStatus ! =0) {

        Node s = node.next;

        if (s == null || s.isShared())

            unparkSuccessor(node);

    }

}

Copy the code

Semaphore.release() then calls AQS releaseShared.

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        Node h = head;

        if(h ! =null&& h.waitStatus ! =0)

            unparkSuccessor(h);

        return true;

    }

    return false;

}

Copy the code

Take a look at Node back then:

static final class Node {

    // Ignore extraneous code and display only the status value of waitStatus



    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

}

Copy the code

The sethead propagate method and releaseShared method are also very simple in design.

The waitStatus of a Node does not PROPAGATE=-3.

I have also, for your convenience, shown the source code of the then unparksucceeded methods:

private void unparkSuccessor(Node node{



    // Set node waitStatus to 0

    compareAndSetWaitStatus(node, Node.SIGNAL, 0);



    Node s = node.next;

    if (s == null || s.waitStatus > 0) {

        s = null;

        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)

            if (t.waitStatus <= 0)

                s = t;

    }

    if(s ! =null)

        LockSupport.unpark(s.thread);

}

Copy the code

Next, let’s talk slowly ~

Ps: Seriously, the boss is not far from my position, although my work has been completed many days in advance, but, still a little panic ~, take the risk to continue to write!

In the operation of AQS to acquire the shared lock, the thread that enters the synchronous wait (blocked) can be woken up in two ways:

  • The other threads, after releasing the semaphore, call the unparkprecursor (releaseSharedMethod)
  • After other threads successfully acquire the shared lock, they wake up the subsequent nodes through propagation mechanism (i.esetHeadAndPropagateMethod).

An example of a recurring bug is simply instantiating four threads repeatedly in a loop, with the first two threads fetching the semaphore, the second releasing the semaphore, and the main thread waiting for all four threads to execute before printing.

When the last two threads do not release the semaphore, the synchronous wait queue inside the AQS looks like this:

+------+        +------+        +------+

| |  <---  | |  <---  | |

| head |        | t1 |        | t2 |

| |  --->  | |  --->  | |

+------+        +------+        +------+

Copy the code
  • 1, T3 release semaphore, callreleaseSharedAt the same time, the head waitStatus changes to 0
  • 2, t1 is woken up, call Semaphore.NonfairSync’s tryAcquireShared method and return 0
  • T4 release semaphore and callreleaseSharedIn thereleaseSharedThe head read from the head method is the same as the original head, but the head waitStatus has changed to 0, so it will not be calledunparkSuccessormethods
  • T1 was woken up and will not be called because the Semaphore.NonfairSync tryAcquireShared method in Step 2 returned 0unparkSuccessormethods

At this point, both paths are blocked and there are no threads left to wake up T2, the thread is hung…

Ps: Doug Lea black question mark face, haha ~

To fix this bug, Pops made the following improvements:

  • 1. Add a waitStatus state, i.ePROPAGATE
  • 2, inreleaseSharedExtracted from the methoddoReleaseShared()It is shown abovedoReleaseSharedIn the method, if the state of the head node is 0, it needs to be set to PROPAGATE to ensure the propagation of the wake up.
  • 3, insetHeadAndPropagateThere are also some more judgments in the method, where the waitStatus of the head node is less than 0, the subsequent node is woken up (PROPAGATE = -3).

With the improved code, let’s go over it again:

  • 1, T3 release semaphore, callreleaseSharedAt the same time, the head waitStatus changes to 0
  • 2, t1 is woken up, call Semaphore.NonfairSync’s tryAcquireShared method and return 0
  • 3. This step occurs at the same time as step 2, t4 releases the semaphore, calledreleaseSharedIn thedoReleaseSharedThe head read in the method is the same as the original head, but now the waitStatus of the head has changed to 0, set the waitStatus of the head to PROPAGATE (-3).
  • T1 is woken up and calledsetHeadAndPropagateMethod, set T1 as head, meet the criteria, enter the branch statement, calldoReleaseSharedMethod, which then wakes up the T2 node thread.

6.2 A few thoughts on the unparksucceeded

private void unparkSuccessor(Node node{

    int ws = node.waitStatus;

    if (ws < 0)

        compareAndSetWaitStatus(node, ws, 0);



    / *

* In general, the thread to wake up is the successor thread of the current node

* However, if the successor of the current node is cancelled, it traverses forward from the end of the queue until it finds the successor that has not been cancelled

* /


    Node s = node.next;

    if (s == null || s.waitStatus > 0) {

        s = null;

        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)

            if (t.waitStatus <= 0)

                s = t;

    }

    if(s ! =null)

        LockSupport.unpark(s.thread);

}

Copy the code

In the unparksucceeded method, if the successor of the current node is cancelled, one goes forward from the end of the queue until one that has not been cancelled is found.

Why do I start at the tail node and walk forward?

Suppose the CLH queue looks like the following figure:

+------+        +------+        +------+

| |  <---  | |  <---  | |

| head |        | t1 |        | tail |

| |  --->  | |  --->  | |

+------+        +------+        +------+

Copy the code

T1.waitstatus = 1 and tail.waitStatus = 1

Head tries to wake up the successor node T1, and finds that T1 is in the cancelled state. Then find tail, which is also in the cancelled state, but tail. Next == null.

At the same time, a new node is added to the end of the queue, but the old tail.next has not been pointed to the new node.

That is, if tail.next happens to be halfway between steps 1 and 2, the traversal will break.

Excerpt the addWaiter section of the code:

node.prev = pred;

//Insert the current node at the end of the queue through CAS

if (compareAndSetTail(pred, node)) { //steps1

    pred.next = node; //steps2

    return node;

}

Copy the code

6.3 Why tryAcquire again in acquireQueued?

In exclusive mode, here’s how I think about it:

Time 1: Thread B tries to acquire the lock, but since the lock is held by thread A, thread B is ready to call addWaiter to queue itself (but without A pointer connection to the head node).

Point 1: At the same time, thread A tries to release the lock, goes into the release method, calls the tryRelease() of the subclass, sets the state of the number of locks held to 0 (indicating that the lock was not held by any thread), goes into the unparksucceeded method, finds that there are no successor nodes (because the new one is not yet in the team), So it doesn’t wake up any threads, so at this point, thread A is done releasing the lock.

Time 2: Thread B has finished calling the addWaiter method, has joined the queue, and has a pointer connection to the head node

Time 3: Thread B calls the acquireQueued method (shown in the code below). If tryAcquire is not called in this method, it will happen that the lock can be acquired, but the thread is asleep, making the entire synchronization queue unavailable

So, tryAcquire is called again in case the whole synchronization queue crashes because the head node has released the lock before the new node is enqueued.

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;

    try {

        boolean interrupted = false;

        for (;;) {

            final Node p = node.predecessor();

            // The new node has not joined the queue, but the head node has released the lock, causing the whole synchronization queue to break down

            if (p == head && tryAcquire(arg)) {

                setHead(node);

                p.next = null// help GC

                failed = false;

                return interrupted;

            }

            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

                interrupted = true;

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

Copy the code

The end of the

Through reading AQS source code, it is of great help for us to learn and master components based on AQS.

Especially its design concept and thought, is the focus of our study!

Doug Lea’s AQS paper, good English friends, might as well read