0 profile

Queue synchronizer AbstractQueuedSynchronizer (hereafter referred to as “AQS) is to realize the lock and a basic framework about synchronizer.

In JDK5, Doug Lea has added a number of synchronization tools, such as ReentrantLock, ReentrantReadWriteLock, Semaphore, CountDownLatch, etc., which are all based on AQS.

Internally, the synchronization state between threads is controlled by a variable named state, identified as volatile. Multiple threads can monopolize or share resources through AQS.

Based on AQS, it is very convenient to achieve functions that do not exist in Java.

For example, in the case of locks, Java provides the synchronized keyword, which can be used to easily synchronize multiple threads. However, this keyword also has many drawbacks, such as:

  • It does not support time-out lock acquisition, and once a thread fails to acquire a lock from synchronized, it becomes stuck and has no chance to escape. So deadlocks caused by synchronized are usually unsolvable.
  • Unresponsive interrupt.
  • Cannot attempt to obtain the lock. Synchronized does not possess this property, but returns immediately if it does not.

ReentrantLock does all of these things with AQS.

1 Core Structure

Can be seen from the AbstractQueuedSynchronizer name, must be based on the Queue implementations in AQS (Queue). Inside AQS, queues are implemented through linked lists. Each element of a linked list is an implementation of its Node inner class. AQS then points to the head of the queue via the instance variable head, and to the tail of the queue via the instance variable tail.

The source code definition is as follows:

/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */
private transient volatile Node head;

/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */
private transient volatile Node tail;

/** * The synchronization state. */
private volatile int state;


static final class Node {

	/** Indicates shared */
    static final Node SHARED = new Node();
    /** indicates exclusive */
    static final Node EXCLUSIVE = null;

	/** The thread waiting in the synchronization queue has timed out or been interrupted, and the wait needs to be cancelled from the wait queue. The state of the node entering this state will not change */
    static final int CANCELLED =  1;

    /** The successor node of the current node is in the waiting state, and the current node has released the synchronization state, you need to use unpark to wake up the successor node, let it continue to run */
    static final int SIGNAL    = -1;

    /** The current node is waiting on a Condition. When another thread calls the Conditino signal method, the node will be restored from the wait queue to the synchronization queue, giving it a chance to obtain the synchronization status */
    static final int CONDITION = -2;

    /** indicates that the next shared synchronous state acquisition state will be propagated unconditionally */
    static final int PROPAGATE = -3;

	/* The wait state of the current node. The value is one of the above constants. In addition, the value 0 indicates the initial state */
    volatile int waitStatus;

    /* Precursor node */
    volatile Node prev;

    /* Successor node */
    volatile Node next;

    /* The thread waiting to get the synchronization status */
    volatile Thread thread;

    /* Wait for the successor node in the queue */
    Node nextWaiter;
    
    // ...
}
Copy the code

When a thread obtains synchronization status from AQS, AQS encapsulates the current thread inside Node and joins the queue. Therefore, when multiple threads concurrently acquire synchronization status, AQS will hold queues with the following structure:

Based on this queue model, the following describes how threads get synchronized state in AQS.

2 Implementation Principles

As the name AQS suggests, the authors want AQS to serve as a base class (identified by Abstract). So usually AQS are used in an inherited manner.

AQS provides several template methods for implementation classes to implement custom functionality themselves.

These methods are:

  • Boolean tryAcquire(int ARG) : Exclusive access to synchronization status, usually by changing the value of state in CAS mode to achieve a specific function.
  • Boolean tryRelease(int arg) : Exclusively releases synchronization status, usually changing state.
  • Int tryAcquireShared(int arg) : The shared synchronization status is obtained. The returned value >=0 indicates success. Otherwise, the shared synchronization status fails.
  • Boolean tryReleaseShared(int arg) : Shared release synchronization, also by changing the state value.
  • Boolean isHeldExclusively() : Indicates whether AQS are exclusive to the current thread.

The several methods of the default implementation will throw an UnsupportedOperationException anomalies.

We don’t have to worry about these methods for now, just understand that they manage synchronization state internally by controlling the value of state.

2.1 Obtaining synchronization status

Typically, the implementation class tries to modify the value of state first to obtain synchronization status. For example, if a thread successfully changes the value of state from 0 to 1, it has successfully acquired the synchronization state. This modification process is done through CAS, so it is thread safe.

Otherwise, if the state fails to change, the current thread is added to the AQS queue and blocked.

AQS internally provides three methods to modify the state, the source code is as follows:

/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a {@code volatile} read.
 * @return current state value
 */
protected final int getState(a) {
    return state;
}

/**
 * Sets the value of synchronization state.
 * This operation has memory semantics of a {@code volatile} write.
 * @param newState the new state value
 */
protected final void setState(int newState) {
    state = newState;
}

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 */
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Copy the code

2.2 Synchronizing queues

As mentioned above, inside the AQS is actually a two-ended queue of FIFO. When a thread fails to acquire synchronization status, a Node is built and added to the end of the queue (thread-safe, CAS implementation), blocking the current thread (via the locksupport-park () method); When releasing the synchronization state, AQS will first determine whether the head node is null. If it is not null, it indicates that there are threads waiting for synchronization state, and will try to wake up the HEAD node to make it compete for synchronization state again.

2.3 Obtaining the Exclusive Synchronization status

Exclusive means that only one thread can be synchronized at a time.

AQS will first attempt to call the tryAcquire method of the implementation class to obtain the synchronization state. If the acquisition fails, AQS will attempt to encapsulate it as a Node Node and add it to the end of the synchronization queue.

The exclusive synchronization state is obtained through AQS acquire method. The source code is as follows:

public final void acquire(int arg) {
    if(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }Copy the code

This method attempts to acquire a tryAcquire state, and if it fails, adds the current thread to the queue via the addWaiter method. And in the acquireQueued method blocks the current thread (locksupport.park ()) and goes into a spin state to get the synchronization state.

Let’s see how he builds nodes and adds them to the end of the queue. First, the addWaiter:

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    // mode = Node.EXCLUSIVE, which indicates the EXCLUSIVE mode
    Node node = new Node(Thread.currentThread(), mode);
    // Quickly add nodes to the end of the queue via CAS. If this fails, enter the enQ method to add nodes in an infinite loop
    Node pred = tail;
    if(pred ! =null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    // Add nodes to the end of the queue in an infinite loop
    / * note: If you are adding a Node to a queue for the first time, tail will still be null when you call addWaiter, so the addWaiter method will not succeed and will enter this method directly. After entering this method, tail will still be null and will go inside the first if, This will create an empty Node as the head Node and then loop again, tail is not null and will enter else code, at which point the Node that needs to be added will be added to the queue. That is, the first time a queue is created, an empty header is added by default. * /
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

Take a look at the acquireQueued method:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // Enter the spin, continuously obtain synchronization state
        for (;;) {
            // Get the first node in the queue
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                // If you successfully enter this code, you have successfully obtained the synchronization state
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            Locksupport.park () is called to block the current thread
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

ShouldParkAfterFailedAcquire method users determine whether need to block the current thread, methods will be the precursor of the current operation of the node within waitStatus, and on the basis of waitStatus determine whether need to park.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // Node.SIGNAL == -1
        /* * indicates that the current node needs to be woken up by another thread to continue execution, and it is safe to park. * /
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * If a node is in an initial state (waitStatus=0), * sets the precursor node's waitStatus to -1, indicating that it needs to be woken up by another thread to continue executing */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
Copy the code

When shouldParkAfterFailedAcquire method to judge the current node needs to be park, invoked parkAndCheckInterrupt the block:

private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this);
    return Thread.interrupted();
}
Copy the code

2.4 Releasing the Exclusive Synchronization State

Exclusive release of synchronous state is implemented in AQS through the release() method. The source code for this method is as follows:

public final boolean release(int arg) {
    // Try calling the tryRelease method of the implementation class to change the synchronization state.
    if (tryRelease(arg)) {
        Node h = head;
        /* 1. If the head node is null, no other threads compete for synchronization status. If the head node is not null, there is a race. The next node of the head node is awakened by the unpark method through the unparksucceeded method. Make it try competing for synchronous state again. * /
        if(h ! =null&& h.waitStatus ! =0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
Copy the code

The unparksucceeded method wakes up the next node of the head node so that it can recompete the synchronization state:

private void unparkSuccessor(Node node) {
    /* * If waitStatus is negative, for example, -1 (waiting for signal) * restores its value to 0 */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * Get the next node of the header, if not empty, unpark it */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

2.5 Exclusive synchronization state acquisition and release – graphic

To illustrate the process in the source code, let’s first assume that tryAcquire is implemented as follows:

boolean tryAcquire(int acquires) {
    return compareAndSetState(0, acquires);
}
Copy the code

Acquires Parameter acquires is passed 1. If the state value is changed from 0 to 1 through CAS, the synchronization status is obtained successfully. If the state value fails to be changed, the CAS server must be added to the synchronization queue.

Suppose the tryRelease implementation looks like this:

boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (c == 0) {
        setState(c);
        return true;
    }
    return false;
}
Copy the code

The parameter is fixed to release 1, which means that if the current state is 0, the release is successful and other threads can compete with the synchronization state.

Assume that three threads concurrently acquire the synchronization state, identified as T1, T2 and T3, and modify the state value through acquire method at the same time.

Assume that T1 is successfully modified, but T2 and T3 fail to be modified.

After t1 is successfully modified, change the state value to 1 and return directly. Head and tail are empty, so the synchronization queue is also empty. The status of the synchronization queue is as follows:

The t2 thread failed to compete for synchronization status and was added to the synchronization queue:

T3 thread failed to compete for synchronization status and was added to the synchronization queue:

The T1 thread completes execution and releases resources. Restore state to 0 first, and then next node (T2 node) of unpark header to regain the qualification of synchronous state competition.

If T2 is awakened and successfully obtains the synchronization state (i.e. calling tryAcquire method and successfully setting state to 1), T2 will set its Node to head and next of the original head Node to null (to help GC).

T2 completes execution, releases synchronization state, sets state to 0, and wakes up T3 to qualify again

If T3 successfully obtains the synchronization status, t3 sets its Node to head and next of the previous head Node to null (i.e. t2 next to null).

T3 After the synchronization is complete, release the synchronization status and set state to 0. Since its waitStatus is equal to 0, it indicates that there are no successor nodes to unpark, and the release success is returned directly

The last T3 node is not cleared because it can be used as the head node for the next synchronization state contention.

2.6 Obtaining the Synchronization Status Due to Timeout

The tryAcquireNanos method implements this functionality. It is much the same process as the one described above, but with the addition of a time judgment. That is, each time the spin obtains the synchronization status, it checks whether the current time exceeds the specified timeout period. If the timeout period is exceeded, the retrieval failure is directly returned.

TryAcquireNanos = tryAcquireNanos

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();

    // Try to obtain the synchronization status. If the synchronization fails, try timeout
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
Copy the code

As you can see, the final result is the timeout function implemented by the doAcquireNanos method, in which most of the logic is consistent with the above procedure. The differences are explained in the comments.

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;

    // Calculate the timestamp for that point in time
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // If a timeout occurs, the system returns an error message
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            If there is no timeout, determine whether the remaining time is greater than 1000 ns. If so, park the current thread
            // Otherwise, do not park and go directly to the next spin fetch, because this time is small enough that it may already exceed the time of a system call
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) // spinForTimeoutThreshold = 1000
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw newInterruptedException(); }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

3 reference

  • The Art of Java Concurrent Programming Teng Fei, Wei Peng, Cheng Xiaoming