This is the seventh day of my participation in the August More text Challenge. For details, see: August More Text Challenge

One, foreword

AbstractQueuedSynchronizerReferred to as”AQS, abstract queue synchronizer

It is the basic tool for JUC package synchronization.

AQS is an abstract class that implements synchronous queues with built-in spin-locks, encapsulates enqueued and dequeued operations, and provides methods for exclusive, shared, interrupt, and other features.

inAQSIn, one is definedvolatile int stateVariables as shared resources:

  1. If the thread fails to acquire the resource, synchronization is enteredFIFOWaiting in a queue
  2. The critical section code executes if the resource is successfully acquired
  3. When the resource is released, the waiting thread in the synchronization queue is notified to get the resource, dequeue and execute

In brief: AQS uses CAS to maintain state and LockSupport to operate on threads.


usingAQSClass main steps:

  1. Step 1: Create your own thread collaboration utility class and write one internallySyncClass, theSyncClass inheritanceAbstractQueuedSynchronizer, i.e.,AQS;
  2. Step 2: Think about the collaboration logic of the designed thread collaboration tool class, inSyncClass, overrides the corresponding method based on whether it is exclusive or not. If it is exclusive, rewritetryAcquiretryReleaseMethods; Rewrite if it is non-exclusivetryAcquireSharedtryReleaseSharedMethods;
  3. Step 3: Implement fetch/release methods in your own thread collaboration tool class and call them from thereAQSThe corresponding method is called if it is exclusiveacquirereleaseEtc method, non-exclusive is calledacquireSharedreleaseSharedacquireSharedInterruptiblyMethods.

AQSRealize the principle of

The two most important things in AQS:

  • state: indicates the status. 0 indicates unlocked, and 1 indicates locked
  • Queue: The location where suspended threads are stored
  1. AQSInitially, as shown below:

  1. Thread 1 and thread 2 execute concurrently and lock, as shown in the figure:

  1. The execution of thread 1 is complete, as shown below:

  1. Thread 3 joins, fair lock and non-fair lock:




Second, the problem of

(1) WhyAQS

  1. Relative tosynchronizedBased on theJVM.AQSIt’s lighter and more manageable
  2. AQSThe underlying rely onCASSometimes this is relatively fast and performs better than suspending threads (context switching)


(2) How to lock

In fact, look at the above principle, you will understand that there are two main steps to lock:

  1. Update state = 1

  2. Set the AQS thread to the current thread

    Any other thread will be queued and suspended


(3) How to release a lock

Releasing a lock is, of course, the reverse of locking:

  1. updatestate
  2. updateAQSThe thread fornull
  3. The thread that wakes up the head of the queue

How do I wake up the team leader element?

Use the locksupport.unpark (thread) method.

How does the team leader element reattempt the lock after waking up?

  1. Recover from the blocked state first
  2. And then keep walkingfor(;;)Loop, judge from the beginning
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); Return thread.interrupted (); // 2. After locksupport.unpark (thread), return thread.interrupted (); }Copy the code


(4)stateCan it be greater than 1?

You can.

The value of state increases when the lock is reentered.

Of course, when it comes to releasing, it also needs to be reduced.




Iii. Design idea

Main actions:

  1. CASmaintenancestate
  2. LockSupportOperation thread

At the same time, the template pattern is adopted

Subclasses can customize their own logic to operate on state by implementing protected methods.

Here are some common methods that require subclasses:

​
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
​
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
​
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
​
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
Copy the code




Four, source code analysis

There are four main aspects: Node, exclusive lock, shared lock, LockSupport,

The difference between a shared lock and an exclusive lock is whether it is owned by multiple threads

(0) NodeQueue node

Each Node holds one thread

Static final class Node{/* Static final class Node{/* Wait state of the current Node object. Note that this state describes not the current object but the next Node, * to determine whether to wake the next Node. * A. CANCELLED = 1: Nodes are set to the cancelled state due to timeout or interruption. Nodes in the cancelled state should not compete for locks. * Only the cancelled state should remain unchanged and cannot be converted to another state. Nodes in this state are kicked out of the queue and collected by GC. * b. SIGNAL = -1: indicates that the node's successor is blocked and needs to be notified; * c. CONDITION = -2: indicates that the node is in the CONDITION queue and blocked because it is waiting for a CONDITION; * d. PROPAGATE = -3: Used in shared mode if the head node is in this state, it means that the next acquisition of the lock can PROPAGATE unconditionally; * e.0: None of the above, the new node will be in this state. * * Non-negative values indicate that the node does not need to be notified (awakened). */ volatile int waitStatus; // Null volatile Node prev if it is the header; // Volatile Node next; // The Thread object bound to Node is volatile Thread Thread; Condition is in exclusive mode, so there is a simple queue to describe the thread nodes on the Condition. Node nextWaiter; }Copy the code

(1) an exclusive lock

Process, as shown in the figure:

  1. acquire()Acquiring a lock
public final void acquire(int arg) {
    // 1. Try to obtain the synchronization status and exit if it succeeds
    // 2. If the failure occurs, the device is added to the end of the queue
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }private Node addWaiter(Node mode) {
    // 1. Build the current thread as Node
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            returnnode; }}// 2. If the end node of the current synchronization queue is NULL, it indicates that the current thread is the first thread to join the synchronization queue and wait
    enq(node);
    return node;
}
​
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //1. Construct the header node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 2. Tail insert, CAS operation failed spin attempt
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        
        // The process of spinning
        for (;;) {
            // 1. Obtain the pioneer node of the current node
            final Node p = node.predecessor();
            // 2. Can the current node obtain an exclusive lock
            // 2.1 An exclusive lock can be obtained if the current node's precursor node is a head node and the synchronization status is successfully obtained
            if (p == head && tryAcquire(arg)) {
                // The queue header pointer should point to the current node
                setHead(node);
                // Release the precursor node
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 2.2 Failed to acquire the lock and the thread entered the wait state waiting to acquire the exclusive lock
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}// Suspend after obtaining failed
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
       
        return true;
    if (ws > 0) {
       
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
       
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
​
private final boolean parkAndCheckInterrupt(a) {
    // Causes the thread to block
    LockSupport.park(this);
    return Thread.interrupted(); // Returns whether to block
}
Copy the code
  1. release()Release the lock
public final boolean release(int arg) {
    // tryRelease() is the method each implementation class needs to implement
    if (tryRelease(arg)) {
        Node h = head;
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
​
// Each lock release wakes up the threads referenced by the node's successors in the queue, further confirming that the lock acquisition process is a FIFO (first in first out) process.
private void unparkSuccessor(Node node) {
​
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
​
    // The successor node of the first node
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        // Wake up the thread when the successor node is not NULL
        LockSupport.unpark(s.thread);
}
​
Copy the code
  1. acquireInterruptiblyInterruptible lock acquisition
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if(! tryAcquire(arg))// The thread failed to acquire the lock
        doAcquireInterruptibly(arg);
}
​
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    // Insert the node into the synchronization queue
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // Get the lock out of the team
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // Thread interrupt throws an exception
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code
  1. tryAcquireNanos()Timeout wait lock acquisition

That is, the lock.trylock (timeout, TimeUnit) method is called

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        // Implement the effect of timeout wait
        doAcquireNanos(arg, nanosTimeout);
}
​
​
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    //1. Calculate the cut-off time based on the timeout period and the current time
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //2. The current thread obtains the lock out of the queue
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 3.1 Recalculating the timeout period
            nanosTimeout = deadline - System.nanoTime();
            False is returned after timeout
            if (nanosTimeout <= 0L)
                return false;
            // 3.3 The thread blocks and waits
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 3.4 Thread being interrupted throws an interrupted exception
            if (Thread.interrupted())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code


(2) the Shared lock

The process is shown as follows:

Instead of an exclusive lock, only shared is added to the method.

  1. acquireShared()Obtaining a shared lock
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
​
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // When the precursor node of the node is the head node and the synchronization status is obtained successfully
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code
  1. releaseShared()Releasing a shared lock
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
​
​
private void doReleaseShared(a) {
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break; }}Copy the code

(3) LockSupport

// Look at the upparksucceeded () method
private void unparkSuccessor(Node node) {
​
    // Omit irrelevant code
    LockSupport.unpark(s.thread);
}
​
public static void unpark(Thread thread) {
    // Omit irrelevant code
    if(thread ! =null)
        // delegate to the UNSAFE#unpark method, which is a local method
        UNSAFE.unpark(thread);
}
Copy the code