This article is part of the Java Concurrent Programming series. If you want to learn more, check out the Java Concurrent Programming Overview.

preface

As we learned in the previous article, “Locking Interfaces for Concurrent Java Programming”, Under the whole Lock under the Java interfaces by AQS is the locking mechanism (here we will AbstractQueuedSynchronizer or AbstractQueuedLongSynchronizer collectively known as AQS) with the Condition. Now let’s understand the internal details and implementation principle of AQS.

PS: this article will explain in AbstractQueuedSynchronizer to, interested in AbstractQueuedLongSynchronizer friend, can check the related information.

AQS profile

Abstract queue synchronizer AbstractQueuedSynchronizer (hereinafter referred to as “AQS), is used to construct the lock or other synchronous component based framework, it USES a int member variables to represent the synchronization state, Threads that acquire shared resources are controlled by a built-in FIRST-in-first-out (FIFO) synchronization queue.

This class is designed to be the base class for most synchronization components that rely on a single atomic value (int) to control synchronization state. Subclasses must define methods for getting and releasing synchronization state. AQS provides three methods getState(), setState(int newState) and compareAndSetState(int expect, int update) to operate. Subclasses should be for custom at the same time synchronization component of a static inner class, AQS itself does not implement any synchronization interfaces, it is only defines the number of state synchronization acquisition and release methods for use by the custom synchronous components, synchronizer can support to exclusive access to sync, can also support to Shared access to sync, This makes it easy to implement different types of synchronization components (ReentrantLock, ReentrantReadWriteLock, CountDownLatch, etc.).

Introduction to AQS class methods

The design of AQS is based on the template method pattern, that is, the consumer inherits the synchronizer and overwrites the specified methods, then combines the synchronizers in the implementation of a custom synchronization component and calls the template methods provided by the synchronizer, which will invoke the user-overwritten methods.

Method for modifying synchronization status

When subclasses implement custom synchronization components, they need to obtain and release synchronization state through the following three methods provided by AQS.

  • Int getState() : gets the current synchronization status
  • Void setState(int newState) : Sets the current synchronization status
  • Boolean compareAndSetState(int expect, int Update) Use CAS to set the current state.

A method that can be overridden in a subclass

  • Boolean isHeldExclusively() : Specifies whether the current thread has an exclusive lock
  • Boolean tryAcquire(int ARg) : Exclusive attempt to obtain synchronization status, set synchronization status through CAS operation, return true if successful, false otherwise
  • Boolean tryRelease(int arg) : Exclusive release synchronization state.
  • Int tryAcquireShared(int arg) : Shared synchronization status. If the value is greater than or equal to 0, the synchronization status is successfully obtained. Otherwise, the synchronization status fails.
  • Boolean tryReleaseShared(int arG) : Shared release synchronization status.

Methods for obtaining and releasing synchronization state

When we implement a custom synchronization component, we will call the methods provided by AQS to synchronize the state and release. Of course, these methods will call the template methods of their subclasses internally. Methods provided externally are divided into two categories, as follows:

  • Exclusive get and release synchronization state
  1. Void acquire(int arg) : Exclusive acquire synchronization status, if the current thread successfully obtain synchronization status, return, otherwise enter the synchronization queue wait, this method will call tryAcquire(int arg) method.
  2. Void acquireInterruptibly(int arg) : The same basic logic as void Acquire (int arg), but this methodIn response to interruptIf no synchronization status is currently obtained, it will be queued. If the current thread is interrupted (Thread().interrupt()), the method will throw InterruptedException. And return
  3. Boolean tryAcquireNanos(int arg, long nanosTimeout) :On the acquireInterruptibly(int ARg) basisIf the current thread did not obtain the synchronization status, then return fase, otherwise return true.
  4. Boolean release(int arg) : Exclusive release synchronization status
  • Shared get and release synchronization state
  1. Void acquireShared(int arg) : acquireShared(int arg) : acquireShared(int arg)The main difference from exclusive fetching is that multiple threads can acquire synchronous state at the same time.
  2. Void acquireSharedInterruptibly (int arg) :The basic logic in acquireShared(int arg) is the same, added a response interrupt.
  3. Boolean tryAcquireSharedNanos(int arg, long nanosTimeout) :On the basis of acquireSharedInterruptibly, added a timeout limit.
  4. Boolean releaseShared(int ARG) : Shared release synchronization status

AQS concrete implementation and internal principles

After understanding the AQS in different ways to obtain and release the synchronization state (exclusive and shared) and modify the synchronization state method, now we come to understand the specific implementation of AQS and its internal principle.

FIFO queues in AQS

In the previous article, we mentioned that AQS mainly controls thread synchronization through a FIRST-in-first-out (FIFO). So in a real program, AQS would construct a Node Node from the thread that gets the synchronization status and add that Node to the queue. The thread is blocked if it fails to acquire the synchronization state, and when the synchronization state is released, the thread in the head node is woken up to try to acquire the synchronization state.

Node Structure

Let’s look at the information stored in a Node Node in real code. Node is implemented as follows:

    static final class Node {
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
    }
Copy the code

The Node Node is a static inner class in the AQS. Its attributes (volatile) are described below.

  • Int waitStatus: indicates the waitStatus
  1. SIGNAL = -1: If the thread of the current node releases or cancels the synchronization status, it will SINGAL the status of the current node to notify the next node of its readiness to obtain the synchronization status.
  2. CANCELLED = 1: Threads that have been CANCELLED or whose synchronization status has timed out will be returned to their current state and will no longer be blocked.
  3. CONDITION = -2: The current node is on the wait queue in CONDITION (CONDITION is described in the next article). After another thread calls CONDITION’s singal() method, the node is moved from the wait queue to the AQS synchronization queue, waiting to acquire the synchronization lock.
  4. PROPAGATE = -3: Is related to the shared acquisition synchronization state, which identifies that the thread corresponding to the node is in a runnable state.
  5. 0: indicates initialization.
  • Node prev: indicates the last Node of the current Node in the synchronization queue.
  • Node Next: Next Node of the current Node in the synchronization queue.
  • Thread Thread: current Thread converted to Node.
  • Node nextWaiter: The current Node waits for the next Node on the queue in the Condition (Condition is covered in the next article).

AQS synchronization queue implementation structure

From the above description, we have a general understanding of the data and information stored in the Node Node. Now let’s take a look at the structure of the entire AQS synchronization queue. The details are shown in the figure below:

head
tail

AQS adds tail nodes

When a thread successfully gained the synchronization state (or lock), other threads can’t get to the synchronous state, this time the thread will be constructed as the Node Node, and add to the synchronous queue, and this process to join the queue must be thread-safe, so in AQS provides a method based on CAS set tail Node: CompareAndSetTail (Node expect,Nodeupdate), which needs to pass the tail Node and the current Node that the current thread “thinks” of. The current Node is not formally associated with the previous tail Node until the setting is successful. The specific process is shown in the figure below:

AQS adds a head node

In the synchronization queue of AQS, the head node is the node that has successfully obtained the synchronization state. When the thread of the head node releases the synchronization state, it will wake up its next node, and the next node will set itself as the head node when it has successfully obtained the synchronization state, as shown in the following figure:

In the figure above, the dotted line is the node that head pointed to before. Setting the head node is done by the thread that succeeded in obtaining the synchronization state. Since only one thread can successfully obtain the synchronization state, the method of setting the head node does not require CAS to ensure that the next point of the original head node is disconnected.

Now we have seen how to set up the head and tail nodes of the AQS synchronization queue. Now let’s look at the actual code, because it involves different states getting synchronous states (exclusive versus shared), so we’ll look at both states separately.

Exclusive synchronization state acquisition and release

Exclusive synchronization status acquisition

Acquire (int arg) can acquire synchronization state, but it is important to note that this method does not respond to thread interrupt and timeout mechanism to acquire synchronization state. Nodes put into the synchronization queue by this method will not be removed from the synchronization queue, even if the current thread has been interrupted. The specific code is as follows:

  public final void acquire(int arg) {
        if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

In this method, tryAcquire(ARG) is subclassed to obtain the synchronization state. If it fails to obtain the synchronization state, the request thread is constructed into a Node (Node.exclusive), and the thread is added to the end of the synchronization queue (because the queue in AQS is FIFO type). Next let’s look at the details of the addWaiter(Node mode) method:

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);// Construct the thread as a Node Node
  
        Node pred = tail;
        if(pred ! =null) {// Try to point the tail pointer to the Node Node constructed by the current thread
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
              // If successful, next of the Node pointed to before the tail pointer points to the Node constructed by the current thread
                pred.next = node;
                return node;
            }
        }
        enq(node);// If the current tail pointer is null, the final Node (enq) method is called
        return node;
    }
Copy the code

In this method, there are two steps:

  • If the current tail pointer is not null, attempt to point tail to the Node constructed by the current thread. If successful, point next of the Node previously pointed to by the tail pointer to the Node constructed by the current thread, and return the current Node.
  • Instead, the final Node Node (ENQ) method is called to add the Node constructed by the current thread to the synchronization queue.

Let’s move on to the ENQ (Final Node Node) method.

  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {// If the current tail pointer is null, then try to point the head pointer to the Node Node constructed by the current thread
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {// If the current tail pointer is not null, try pointing tail to the Node Node constructed by the current thread
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    returnt; }}}}Copy the code

In the ENQ (Final Node Node) method, an infinite loop (which you can also call a spin) ensures that nodes are added correctly. Next, we continue to look at the processing of the acquireQueued(Final Node Node, int arg) method. This method is the whole multi-thread competition synchronization state key, we must pay attention to look!!

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();// Get the last node of this node
                // If the previous node is the node pointed to by the head lock and the synchronization status of the node is successfully obtained
                if (p == head && tryAcquire(arg)) {
		            // Set head to point to the node,
                    setHead(node);
                    p.next = null; // Disconnect the next pointer from the previous node
                    failed = false;
                    return interrupted;
                }
                // Determine whether the thread that failed to obtain synchronization status needs to be blocked
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())// Block and determine whether the current thread has been interrupted
                    interrupted = true; }}finally {
            if (failed)
            // If the thread is interrupted, it is removed from the synchronization queue and the next node is woken upcancelAcquire(node); }}Copy the code

This method is mainly divided into three steps:

  • throughAn infinite loop (you could call it spin)To get the synchronization status if the current nodeThe previous node is the node to which the head pointsandThe synchronization status of the node is successfully obtained, head is set to point to the node and next is disconnected from the previous node.

  • If the previous node of the current node is not the node pointed to by head, or if the synchronization status of the current node fails to be obtained, the first call is madeshouldParkAfterFailedAcquire(Node pred, Node node)Method to determine whether to block the current thread if the method returnstrueThe callparkAndCheckInterrupt()Method to block the thread. If the method returnsfalse, the method internally changes the state of the previous Node of the current Node to Node.singal.
  • In the finally statement block, determine whether the current thread has been interrupted. If interrupted, pass thencancelAcquire(Node node)Method removes the thread (corresponding Node Node) from the synchronization queue and wakes up the next Node.

We went on to see below shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) method, take a look at specific logic, concrete block code is as follows:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
	        // The last node was set to release the status request signal, so the current node can safely block
            return true;
        if (ws > 0) {
	        // The last Node has been interrupted or timed out, so skip all states as node. CANCELLED
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
	        // The cas operation is invoked to set the state to Node.singal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
Copy the code

In this method, the state of the previous node (waitStatus) is obtained and the following three steps are performed.

  • If the state of the previous Node is Node.SIGNAL, subsequent threads are blocked(Function return true).
  • If the status of the last Node is greater than 0 (only Node.CANCELLED is greater than 0, as we can see from waitStatus above), all Node whose status is Node.CANCELLED will be skipped.(Function return false).
  • If the previous Node is in another state, the CAS operation is invoked to set its state to Node.singal.(Function return false).
Blocking the implementation

When shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) method to return true, then invokes the parkAndCheckInterrupt () method to block the current thread. The return value of this method is whether the current thread is interrupted.

 private final boolean parkAndCheckInterrupt(a) {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

In this method, the main way to block a thread is to block the current thread through the park of LockSupport (more on that in a later article).

Cancels state retrieval and wakes up the next node

The acquireQueued(Final Node Node, int arg) method finally executes the code in the finally block to determine if the current thread has been interrupted. If interrupted, the state acquisition of that thread is cancelled and the next thread Node is awakened by the cancelAcquire(Node Node) method. So let’s look at a concrete implementation of this method. The specific code is as follows:

   private void cancelAcquire(Node node) {
        // Return if the current node does not exist
        if (node == null)
            return;
		// (1) Set the thread corresponding to this node to null
        node.thread = null;

        // (2) Skip the node that has been canceled before the current node
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

		// Get the next node of the node after operation (2)
        Node predNext = pred.next;

	    // (3) Set the status of the node corresponding to the current interrupted thread to CANCELLED
        node.waitStatus = Node.CANCELLED;

        // (4) If the current interrupted node is the tail node, then the tail node is pointed to again
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // (5) If the state of the last node of the interrupted node is SINGAL or will be SINGAL,
            // Remove the current interrupt node
            int ws;
            if(pred ! = head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <=0&& compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread ! =null) {
                Node next = node.next;
                if(next ! =null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);// (6) Remove this node and wake up the next node
            }

            node.next = node; // help GC}}Copy the code

Looking at the appeal code, we can see that this method does several things

  • (1) Set the thread corresponding to the node corresponding to the interrupt thread to NULL

  • (2) Skip nodes that have been CANCELLED before the current Node (as we already know in the enumeration of Node.waitStatus, only CANCELLED greater than 0)

  • (3) Set the node state corresponding to the current interrupted thread to CANCELLED

  • (4) Under the premise of (2), if the current interrupted node is the tail node, then the tail node is pointed to the node after the operation of (2) through CAS operation.

  • (5) If the current interrupt nodeNot tail node, and the state of the last node of the current interrupted node is SINGAL or will be SINGAL, then the current interrupted node is removed.
  • (6) If the condition (5) is not met, then it will be calledunparkSuccessor(Node node)Method to wake up the next node. The specific code is as follows:
    private void unparkSuccessor(Node node) {
         // Reset the node to its initial state
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // Get the next node of the interrupt node
        Node s = node.next;
        // Check the status of the next Node. If the status is Node.CANCELED
        if (s == null || s.waitStatus > 0) {
            s = null;
            // The nearest waitStatus<=0 is traversed through the tail node
            for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                    s = t;
        }
        // If the node is not null, the thread in the node is woken up.
        if(s ! =null)
            LockSupport.unpark(s.thread);
    }
Copy the code

Here for the convenience of everyone to understand, I will add the figure, (the picture may not be very clear, I suggest you click to browse the larger picture),

The exclusive synchronization state is released

After the thread successfully obtains the synchronization state and executes the corresponding logic, it needs to release the synchronization state so that the subsequent thread nodes can continue to obtain the synchronization state. To release the synchronization state, call relase(int arg) method of AQS. The specific code is as follows:

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if(h ! =null&& h.waitStatus ! =0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
Copy the code

In this method, the template method tryRelease(int arg) is called, which means that the synchronization state release logic is user-defined. After the tryRelease(int arg) method returns true, if the current head is not null and the head waitStatus! =0, then the unparksucceeded (Node Node) method is called to wake up the next Node (to try to get the synchronization state). The methods of unparksucceeded (Node Node), which have been analysed above, will not be described here.

Shared synchronization state acquisition and release

The main difference between shared and exclusive acquisition is whether more than one thread can obtain the synchronization state at the same time. Take file reading and writing as an example. If a program is reading a file, all write operations on the file will be blocked at that moment. Other read operations can be performed simultaneously. If a file is written, all other reads and writes are blocked at that moment. Writes require exclusive access to the resource, while reads can be shared access.

Shared synchronization status acquisition

Now that we know the difference between shared and exclusive synchronized state acquisition, let’s take a look at the related methods of shared state acquisition. In AQS by acquireShared(int ARg) method to achieve. The specific code is as follows:

  public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
Copy the code

Inside this method, the template method tryAcquireShared(int arg) is called. The same as obtaining the synchronous state exclusively, this method also needs to be customized. If the return value of tryAcquireShared(int arg) is less than 0, the synchronization status is not obtained. In this case, call doAcquireShared(int arg) to obtain the synchronization status. Otherwise, no operation is performed if the synchronization status is successfully obtained. DoAcquireShared (int ARg)

   private void doAcquireShared(int arg) {
	    // (1) add a shared node to the FIFO queue in AQS
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //(2) Spin the synchronization state
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
	                    // When the synchronization status is successfully obtained, set the head pointer
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}//(3) determine whether the thread needs to block
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; }}finally {
	        //(4) Wake up the next node if the thread has been interrupted
            if(failed) cancelAcquire(node); }}Copy the code

On the whole, the logic of shared acquisition is almost the same as that of exclusive acquisition, with the following steps:

  • (1) Add a SHARED Node to the FIFO queue in AQS. Note that the Node is constructed as addWaiter(Node.shared), where node.shared is the static final Node SHARED = new Node()), and the state of the Node constructed by the addWaiter (node.shared) method is the initial state, which is waitStatus= 0.

  • (2) Spin gets synchronization state. If the previous node of the current node is head node, its synchronization state is successfully obtained, then setHeadAndPropagate(node, r) will be called; , resets head to point to the current node. Also reset the Node state waitStutas = PROPAGATE(shared state) and exit the doAcquireShared(int arg) method directly. The specific situation is shown in the figure below:

  • (3) If condition (2) is not met, it will judge that the previous node of the current node is not the node pointed by head, or the synchronization status of the current node fails to be obtained, so it will be called firstshouldParkAfterFailedAcquire(Node pred, Node node)Method to determine whether to block the current thread if the method returnstrueThe callparkAndCheckInterrupt()Method to block the thread. If the method returnsfalse, the method internally changes the state of the previous Node of the current Node to Node.singal. The specific situation is shown in the figure below:

  • (4) Wake up the next node if the thread has been interrupted

Propagate(Node propagate, int propagate); propagate (Node propagate, int propagate); propagate (Node propagate, int propagate) The specific code is as follows:

    private void setHeadAndPropagate(Node node, int propagate) {
	    //(1) set the head pointer to this node
        Node h = head; // Record old head for check below
        setHead(node);
        
        //(2) Execute doReleaseShared();
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // The doReleaseShared () method is called if the next node of the current node is a shared get synchronization state node
            if (s == null|| s.isShared()) doReleaseShared(); }}Copy the code

In the setHeadAndPropagate(Node Node, int propagate) method there are two arguments. The first parameter, node, is the thread node that is currently sharing the synchronization state. The second parameter propagate (Chinese meaning propagate, propagate) is the number of thread nodes for sharing the synchronization state.

Its main logical steps are divided into the following two steps:

  • (1) Set the head pointer pointing to the node.We can see that in shared fetching, the Head node always points to the thread node with the most successful fetching.
  • (2) To determine whether to execute doReleaseShared(), we can conclude from the code, mainly through this conditionif (s == null || s.isShared())Where S is the next node of the current node (that is, multiple threads may access the node at the same time). When this condition is true, the doReleaseShared() method is called. As for how to determine whether the next node is a shared thread node, the specific logic is as follows:
   // In SHARED access, the current node is of SHARED type
   final Node node = addWaiter(Node.SHARED);
   
   // The Node constructor is called inside the addWaiter call, which sets nextWaiter to Node.shared.
   Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
   //SHARED is a static Node class
   final boolean isShared(a) {
            return nextWaiter == SHARED;
        }
        
Copy the code

Let’s move on to the implementation of the doReleaseShared () method as follows:

 private void doReleaseShared(a) {
        for (;;) {
            Node h = head;
            if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
	                (1) From the figure above, we can know that in the shared synchronization queue, if there are blocked nodes,
	                // Then the Node to which head points must be in node.singal state.
	                // The CAS operation is used to set the node state pointed by the head to its initial state. If successful, the next blocked thread of the head is woken up
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);// Wake up the next node thread
                }
				//(2) indicates that the thread has successfully obtained the shared state, so the thread Node state is set to node.propagate through CAS operation
				// From the above figure, we can see that in a shared synchronization queue,
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // If the head pointer changes, keep loop, otherwise break loop
                break; }}Copy the code

From the code we can see that this method is divided into two main steps:

  • (1) From the figure above, we can know that in the shared synchronization queue, if there are blocked nodes,The Node to which head points must be in node.singal stateThe CAS operation sets the node state pointed by the head to its initial state. If the CAS operation succeeds, the next blocked thread node under the head is woken up. Otherwise, the loop continues.
  • (2) If the condition (1) is not met, it indicates that the node has successfully obtained the synchronization state. Then, the state of the thread node is set as through CAS operationwaitStatus = Node.PROPAGATEIf the CAS operation fails, the loop continues.
Shared synchronization is released

When the thread has successfully acquired the synchronization state and executed the corresponding logic, it needs to release the synchronization state so that the subsequent thread nodes can continue to acquire the synchronization state. This can be done by calling AQS releaseShared(int arg) method. The specific code is as follows:

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code

Exclusive and shared timeouts get synchronization status

Because the logic of exclusive and shared timeout obtaining synchronization state is almost the same as that of non-timeout obtaining synchronization state itself. So here’s the logic for getting the synchronization state with an exclusive timeout.

TryAcquireNanos (int arg, Long nanosTimeout) is called as shown in the following code:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
Copy the code

Looking at the code, InterruptedException is thrown if the current thread is interrupted, and if tryAcquire(ARG) is available, it returns InterruptedException. If the current thread fails to obtain the synchronization status, The doAcquireNanos(int arg, Long nanosTimeout) method is called to timeout to obtain the synchronization status. Now let’s look at the specific code implementation of this method, as shown in the figure below:

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //(1) Calculate the end time of the timeout wait
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //(2) Return if the synchronization status is obtained successfully
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // The remaining time of the calculation if the synchronization status fails to be obtained
                nanosTimeout = deadline - System.nanoTime();
                //(3) Exit if timeout occurs
                if (nanosTimeout <= 0L)
                    return false;
                //(4) If there is no timeout and nanosTimeout is greater than spinForTimeoutThreshold (1000 nanoseconds),
                // Let the thread wait nanosTimeout (the remaining time, in nanoseconds).
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //(5) Throw an exception if the current thread is interrupted
                if (Thread.interrupted())
                    throw newInterruptedException(); }}finally {
            if(failed) cancelAcquire(node); }}Copy the code

The whole method consists of the following steps:

  • (1) Before the thread obtains the synchronization state, calculate the end time of waiting when the timeout occurs. (Accurate to nanoseconds)
  • (2) Obtain synchronization status through spin operation, if successful, directly return
  • (3) If obtaining synchronization fails, the remaining time is calculated. If you have run out of time, exit.
  • (4) If there is no timeout, judge the current remaining timeWhether nanosTimeout is greater thanSpinForTimeoutThreshold (1000 nanoseconds), if greater than, passLockSupport.parkNanos(this, nanosTimeout)Method makes the thread wait for the appropriate time. The method will be changed in accordance with the incomingnanosTimeoutTime, wait for the corresponding time to return. .If nanosTimeout is less than or equal toSpinForTimeoutThreshold will not cause the thread to wait out of time, but to enter a fast spin process. The reason is that very short timeout waits cannot be very precise, and if timeout waits are made at this point, nanosTimeout timeouts as a whole appear imprecise. Therefore, in the case of very short timeouts, the thread will enter unconditional fast spin.
  • (5) If step (4) is not taken, the current thread has been interrupted, thenInterruptedException is thrown directly.

The last

Up to now, we have a basic understanding of the internal structure of the whole AQS and the realization of exclusive and shared access to synchronization state, but we have not introduced the blocking, waiting, wake up (related to LockSupport tool class) related knowledge points. Future articles will cover the LockSupport tool and the Condition interface for lock-related wait/notification patterns. I hope you continue to keep learning motivation ~~.

conclusion

  • The whole AQS is based on its internal FIFO queue to achieve synchronous control. The requested thread is encapsulated as a Node Node.
  • AQS is divided into the whole into exclusive and shared to obtain synchronization status. It supports thread interrupts and timeout fetching.