In this chapter, LZ will analyze the exclusive acquisition of synchronization state and release in AQS. AQS provides exclusive template methods to obtain synchronization state and release:

  1. acquire(int arg);

  2. acquireInterruptibly(int arg)

  3. tryAcquireNanos(int arg, long nanosTimeout)

  4. release(int arg)

  5. tryRelease(int arg)

Today LZ will cover the use of these template methods in detail.

1 acquire

Acquire (int args) is an exclusive method to acquire the synchronization state. This method is interrupt-insensitive, that is, if a thread fails to acquire the synchronization state and enters the CHL, the thread will not be removed from the CHL when it is interrupted. The source code is as follows:

public final void acquire(int arg) {
     if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

The above code mainly completed the synchronization state of the acquisition, node construction, join to THE CHL and spin, etc., source code analysis is as follows:

  • TryAcquire (ARG) : Attempt to acquire synchronization status, return true if successful, false otherwise. This method is implemented by the custom synchronizer itself and must be thread-safe.

  • AddWaiter (node.exclusive) : Creates a Node in EXCLUSIVE mode and adds the Node to the end of the CHL.

  • AcquireQueued (addWaiter(Node.exclusive), ARg) : Obtains the synchronization status in an infinite loop.

The addWaiter(Node.exclusive) and acquireQueued(Final Node Node, int arg) methods are shared as follows: tryAcquire(arg)

 1private Node addWaiter(Node mode) {
 2    // Build the node
 3    Node node = new Node(Thread.currentThread(), mode);
 4    // Try adding nodes at the end quickly
 5    Node pred = tail;
 6    if(pred ! =null) {
 7        node.prev = pred;
 8        if (compareAndSetTail(pred, node)) {
 9            pred.next = node;
10            return node;
11        }
12    }
13    enq(node);
14    return node;
15}
16
17private Node enq(final Node node) {
18    for (;;) {
19        Node t = tail;
20        /** 21 * When the CHL queue is empty, build an empty node as the head node
23        if (t == null) {
24            if (compareAndSetHead(new Node()))
25                tail = head;
26        } else {
27            // Add node to the end of the CHL
28            node.prev = t;
29            if (compareAndSetTail(t, node)) {
30                t.next = node;
31                return t;
32            }
33        }
34    }
35}
36// Get synchronization status in an infinite loop
37final boolean acquireQueued(final Node node, int arg) {
38    boolean failed = true;
39    try {
40        boolean interrupted = false;
41        for (;;) {
42            final Node p = node.predecessor();
43            // Only if the node's precursor is the head of the synchronizer can the synchronization state be obtained
44            if (p == head && tryAcquire(arg)) {
45                setHead(node);
46                p.next = null// help GC
47                failed = false;
48                return interrupted;
49            }
50            if (shouldParkAfterFailedAcquire(p, node) &&
51                parkAndCheckInterrupt())
52                interrupted = true;
53        }
54    } finally {
55        if (failed)
56            cancelAcquire(node);
57    }
58}

Copy the code

The above method uses the compareAndSetTail(pred, Node) method to ensure that nodes can be added to the CHL tail in a thread-safe manner. Thread-safe addition to the CHL is important here. Without thread-safe addition of nodes to the CHL, the CHL will not be able to guarantee data correctness if one thread fails to obtain the synchronization state and other threads concurrently add nodes to the CHL. The acquireQueued(final Node Node, int arg) method shows that the current thread is trying to obtain the synchronization state in an “infinite loop” and only the first Node can obtain the synchronization state. If the current thread is not the first node is called shouldParkAfterFailedAcquire (p, node) method, if this method returns true, the thread into the blocking state, know the thread to be awakened will continue to run. We’ll look at shouldParkAfterFailedAcquire (p, node) source code:

 1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2    // Get the wait status of the current node
 3    int ws = pred.waitStatus;
 4    /** 5 * Returns true if the status value of the precursor node is -1. SIGNAL is used to wake up the current node 7 */ when the precursor node releases its synchronization state
 8    if (ws ==  Node.SIGNAL)
 9        return true;
10    /** 11 * ws > 0, node. CANCLE. This value indicates that the current Node needs to be removed from the CHL queue 12 * due to an interruption or cancellation
14    if (ws > 0) {
15        do {
16            node.prev = pred = pred.prev;
17        } while (pred.waitStatus > 0);
18        pred.next = node;
19    } else {
20        // In CAS mode, the status of the precursor node is changed to SIGNAL
21        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
22    }
23    return false;
24}
Copy the code

Acquire (ARG) = acquire(ARG); acquire(ARG) = acquire(ARG);

The process of acquire methods

2 acquireInterruptibly

AcquireInterruptibly (int ARg) As can be seen from the name, this method responds to interrupts compared to acquire(ing arg). If the thread is interrupted, it will immediately respond to the interruption and throw InterruptedException. The source code is as follows:

1public final void acquireInterruptibly(int arg) throws InterruptedException {
2    if (Thread.interrupted())
3        throw new InterruptedException();
4    if(! tryAcquire(arg))5        doAcquireInterruptibly(arg);
6}

Copy the code

The acquireInterruptibly(int ARG) method is called to determine whether the thread is interrupted and throws InterruptedException. If not, tryAcquire(ARG) is called to obtain the synchronization status. If obtaining the synchronization status fails, the doAcquireInterruptibly(ARG) method is called. DoAcquireInterruptibly (ARG) method

 1private void doAcquireInterruptibly(int arg) throws InterruptedException {
 2    // Build nodes in exclusive mode and add them to the tail of the CHL
 3    final Node node = addWaiter(Node.EXCLUSIVE);
 4    boolean failed = true;
 5    try {
 6        for (;;) {
 7            // Get the precursor node of the current node
 8            final Node p = node.predecessor();
 9            // Determine whether the precursor node is a header, and if so, try to get the synchronization status
10            if (p == head && tryAcquire(arg)) {
11                // Set node to head
12                setHead(node);
13                p.next = null// help GC
14                failed = false;
15                return;
16            }
17            if (shouldParkAfterFailedAcquire(p, node) &&
18                parkAndCheckInterrupt())
19                throw new InterruptedException();
20        }
21    } finally {
22        if (failed)
23            cancelAcquire(node);
24    }
25}

Copy the code

The doAcquireInterruptibly(ING ARg) method and acquire(int ARg) method implement exactly the same functionality.

  • The doAcquireInterruptibly(ING ARG) method throws an exception when it is declared;

  • If an interrupt is needed, throw an interrupt exception instead of returning an interrupt flag.

3 tryAcquireNanos

TryAcquireNanos timeout acquirenanos timeout acquirenanos timeout

1public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
2    if (Thread.interrupted())
3        throw new InterruptedException();
4    return tryAcquire(arg) ||
5        doAcquireNanos(arg, nanosTimeout);
6}

Copy the code

If the thread is interrupted, throw InterruptedException, otherwise obtain the synchronization status, and call doAcquireNanos(ARG, nanosTimeout) if the synchronization status fails. DoAcquireNanos (ARG, nanosTimeout) method definition:

 1private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
 2    if (nanosTimeout <= 0 L)
 3        return false;
 4    // Deadline
 5    final long deadline = System.nanoTime() + nanosTimeout;
 6    // Exclusive is added to the CHL tail
 7    final Node node = addWaiter(Node.EXCLUSIVE);
 8    boolean failed = true;
 9    try {
10       // Spin to get synchronization status
11        for (;;) {
12            final Node p = node.predecessor();
13            // The precursor of the current node is the head node and the synchronization status was successfully obtained
14            if (p == head && tryAcquire(arg)) {
15                // Set the current node to the head node
16                setHead(node);
17                p.next = null// help GC
18                failed = false;
19                return true;
20            }
21            // Calculate the amount of sleep you need
22            nanosTimeout = deadline - System.nanoTime();
23            // Return false if timeout has occurred
24            if (nanosTimeout <= 0 L)
25                return false;
26            // If there is no timeout, wait nanosTimeout nanoseconds
27            if (shouldParkAfterFailedAcquire(p, node) &&
28                nanosTimeout > spinForTimeoutThreshold)
29                LockSupport.parkNanos(this, nanosTimeout);
30            // Determine whether the thread is interrupted
31            if (Thread.interrupted())
32                throw new InterruptedException();
33        }
34    } finally {
35        if (failed)
36            cancelAcquire(node);
37    }
38}

Copy the code

We can see that in the doAcquireNanos(int arg, Long nanosTimeout) method, we first determine whether the timeout is less than or equal to 0 and return false if it is less than or equal to 0. Final long deadline = system.nanotime () + nanosTimeout; If the current node is not the head node, the synchronization state fails to be obtained, then the sleep time needs to be calculated (nanosTimeout = deadline-system.nanotime ();). , if the sleep time is less than or equal to 0, return false, otherwise if the timeout is greater than spinForTimeoutThreshold (1000L), sleep nanosTimeout nanoseconds, otherwise enter spin. SpinForTimeoutThreshold is a constant defined by AQS. Why define a timeout threshold here? This is because switching from a sleeping (TIME_WAITINT) state to a RUNNING state results in a context switch, and if the timeout is too short, it can cause too many context switches and waste resources. The whole process of timeout control is as follows:

4 release

After the current thread has obtained the synchronization state and executed the related logic, it needs to release the synchronization state and wake up the subsequent node to obtain the synchronization state. The release(int arg) method is defined as follows:

 1public final boolean release(int arg) {
 2    // Try to release the synchronization state
 3    if (tryRelease(arg)) {
 4        Node h = head;
 5        if(h ! =null&& h.waitStatus ! =0)
 6            // Wake up the successor node
 7            unparkSuccessor(h);
 8        return true;
 9    }
10    return false;
11}

Copy the code

The tryRelease(ARG) method is a way to customize the synchronizer implementation, if the synchronization state is successfully released, to wake up the succeeding ones through the unparksucceeded (h) method. The use and definitions of specific unparksucceeded methods LZ are described later.

Finally, summarize the process of obtaining synchronization state exclusively and releasing it:

When multiple threads simultaneously acquire synchronization state, the synchronizer maintains a synchronization queue. When a thread accesses acquire(int arg), tryAcquire(int arg) is called. TryAcquire (Int arg) is a thread-safe method implemented by the custom synchronizer, so only one thread can obtain the synchronization state. The remaining threads that fail to obtain synchronization status will be wrapped as nodes and added to the synchronization queue. And synchronize all the nodes in a queue all spin way to determine whether the precursor of the current node node, if is the first node is acquiring the synchronization state, if access to sync success, exit is synchronous queue, when after threads to execute the corresponding logic, will release the sync, awakens the following nodes after release.