Today we are going to study to learn AbstractQueuedSynchronizer class, the related principle of Java. Util. Concurrent package provided by many class relies on this kind of queue type synchronizer, such as the commonly used ReentranLock, Semaphore and CountDownLatch.

For ease of understanding, let’s use a piece of code that uses ReentranLock as an example to explain the use of AQS in each method of ReentranLock.

ReentranLock sample

We all know that ReentranLock behaves like Synchronized, which is a reentrant lock, but the implementation of Synchronized is completely different, and we’ll explain how Synchronized works later. In addition, Synchronized blocking cannot be broken, whereas ReentrantLock provides breakable blocking. The following code is ReentranLock’s functions, and we will explain the implementation principles behind these functions in that order.

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
Copy the code

Fair locks and unfair locks

ReentranLock is divided into fair locks and unfair locks. The difference between the two is whether the opportunity to obtain the lock is related to the queue order. As we all know, if a lock is held by another thread, the other thread applying for the lock will be suspended and queued. Theoretically, the thread that calls the lock function first and suspends the wait should be at the front of the wait queue, and the thread that calls the lock function later should be at the back. If, at this point, the lock is released, the waiting thread needs to be notified to try to acquire the lock again. Fair locking allows the first thread to acquire the lock. An unfair lock would wake up all threads and make them try to acquire the lock again, so it might result in a later line acquiring the lock first, which would be unfair.

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
Copy the code

We will find NonfairSync FairSync and inherited the Sync classes, and the parent class of Sync AbstractQueuedSynchronizer (subsequent AQS). But the AQS constructor is empty and does nothing.

After the source code analysis, if not specified, refers to fair lock.

The lock operation

ReentranLock’s lock function is shown below, which calls sync’s lock function directly. That is, FairSync’s lock function is called.

    //ReentranLock
    public void lock() {
        sync.lock();
    }
    //FairSync
    final void lockAcquire (1) {// acquire(1); }Copy the code

Next, we will officially start the AQS related source analysis. The function of acquire function is to acquire the quantity that can only be acquired by a thread in the same time period, which is the abstract lock concept. Let’s analyze the code first, and you’ll see what it means.

Public final void acquire(int arg) {// tryAcquire tries to acquire"Lock", will not enter the subsequent processif(! tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), Arg)) //addWaiter creates a node for the current thread and adds it to the wait queue //acquireQueued Attempts to acquire the lock after the thread has already joined the wait queue. SelfInterrupt (); }Copy the code

TryAcquire, addWaiter, and acquireQueued are all important functions, so let’s take a look at each of these in turn to understand what they do.

Private volatile int state; Protected Final Boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread(); Int c = getState(); int c = getState();if(c == 0) {// If the value is 0, the current exclusive variable has not been occupied by the thread. // If the current blocking queue has no first thread waiting, the implementation of UnfairSync here is inconsistentif(! Hasqueuedtoraise () && compareAndSetState(0, acquires)) {// Succeed CAS, then the current thread has acquired ownership of the variable, that is, successfully acquired the locksetExclusiveOwnerThread(current);
            // setExclusiveOwnerThread Sets this thread as the exclusive variable owner threadreturn true; }}else if(current == getExclusiveOwnerThread()) {// If the thread has acquired ownership of the exclusive variable, then the state value is incremented by one to indicate multiple locks according to the reentrant // principle Int nexTC = c + acquires; int nextc = c + acquires;if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true; } // If none of the preceding conditions is met, the lock fails to be obtainedreturn false;
}
Copy the code

As you can see from the above code, tryAcquire is trying to get the thread-exclusive variable state. The value of state indicates its state: if it is 0, no thread currently owns the variable; Otherwise, a thread has monopolized the variable, which means that another thread has acquired the lock. The current thread acquires the lock itself, and if so, increments state.

There are a few things to note here. The first is the compareAndSetState function, which uses the CAS operation to set the value of state and the volatile modifier to ensure that changing the value of state does not cause multithreading problems. Then there is the question of the difference between a fair lock and an unfair lock. The nonfairTryAcquire function of UnfairSync does not call Hasqueued24 on the same location to determine whether there are threads currently queuing to acquire the lock.

If tryAcquire returns true, the lock was acquired successfully; If false is returned, the lock is not acquired and needs to be queued. Let’s take a look at addWaiter.

Blocking queues waiting for locks

Adding the node storing the current thread information to the relevant function of waiting queue involves the relevant algorithm of lock-free queue. Since nodes are only added to the end of queue in AQS, the lock-free algorithm used is relatively simple. The real lock-free queue algorithm will be explained when analyzing ConcurrentSkippedListMap.

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // The more complex and time-consuming algorithm will be used only when necessary, i.e. optimistic attitude Node pred = tail; // column end pointerif(pred ! = null) { node.prev = pred; // Step 1: The forward pointer to this object points to tailif(compareAndSetTail(pred, node)){// Step 2 :cas points the last pointer to the node pred.next = node; // Step 3: If the result is achieved, make the next pointer to the last node of the old column point to that node.returnnode; }} //cas fails, or enq enq(node) is called when pred == NULL;return node;
}
private Node enq(final Node node) {
    for(;;) {// CAS standard for lock-free algorithmsforNode t = tail;if(t == null) {// Initializeif(compareAndSetHead(new Node())) // Tail = head; }else{// Same as in addWaiter, but with the outer infinite loop, keep trying, spin lock node.prev = t;if (compareAndSetTail(t, node)) {
                t.next = node;
                returnt; }}}}Copy the code

By calling the addWaiter function, AQS queues the current thread, but does not block execution of the current thread. Let’s examine the acquireQueued function.

Wait for queue node operations

Because operations that enter a blocking state are inefficient, AQS tries to prevent threads that attempt to acquire exclusive variables from entering a blocking state. So, after the thread is queued, acquireQueued executes a for loop, each time determining whether the current node should get the variable (at the head of the queue). If should not get or try again to get failure, then call shouldParkAfterFailedAcquire determine whether should enter the blocking state. If the node before the current node is already blocked, it can be determined that the current node cannot acquire the lock. To prevent the CPU from executing the for loop endlessly and consuming CPU resources, parkAndCheckInterrupt is called to enter the blocked state.

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // Execute until lock is acquired, return.
            final Node p = node.predecessor(); 
            // Node is preceded by head, which means node is the next node to acquire the lock.
            if (p == head && tryAcquire(arg)) { // So try again to get the exclusivity variable
                setHead(node); // If it works, set yourself to head
                p.next = null; // help GC
                failed = false;
                return interrupted;
                // At this point, the selfInterrupt function is not blocked, so return false to indicate that there is no need to interrupt to call selfInterrupt
            }
            // Determine whether to enter the blocking state. If ` shouldParkAfterFailedAcquire `
            // Returns true, indicating that blocking is required
            // Call parkAndCheckInterrupt; Otherwise, you can try again to acquire the lock and continue the for loop
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // Block with parkAndCheckInterrupt and return whether it is interrupted
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // The previous node is waiting for notification of the release of an exclusive variable, so the current node can block
        return true;
    if (ws > 0) { // The previous node is in the unfetchable exclusive variable state, so it can be skipped
        / / returns false
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // Set the state of the last node to signal, return false,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt(a) {
    LockSupport.park(this); // Pass in the AQS object itself
    return Thread.interrupted();
}

Copy the code

Blocking and interruption

From the above analysis, we know that AQS blocks the current process by calling the park method of LockSupport. By calling this function, the thread enters the blocking state, and the above lock operation is blocked, waiting to interrupt or before the exclusive variable is released.

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);// Set the blocking object, which is used to record which thread is blocked, for thread monitoring and analysis tools to locate
    UNSAFE.park(false.0L);// Make the current thread not scheduled by the thread, that is, the current thread no longer execute.
    setBlocker(t, null);
}
Copy the code

We’ll learn more about interrupts later, but we’ll continue the AQS thread and look at operations related to releasing exclusive variables.

Unlock operation

Similar to the Lock operation, unlock calls the relase method of AQS with the same argument of 1 as acquire.

public final boolean release(int arg) {
    if(tryRelease(arg)) {tryRelease(arg)) {tryRelease(arg);if(h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); // Wake up the successor node of headreturn true;
    }
    return false;
}
Copy the code

As you can see from the above code, release is the first call to tryRelease to release the exclusive variable. If successful, see if there are blocking threads waiting for locks and, if so, call the unparksucceeded to wake them up.

Protected final Boolean tryRelease(int releases) {// Since only one thread can get an exclusive first variable, all operations do not need to consider multithreading int c = getState() -releases;if(Thread.currentThread() ! = getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free =false;
    ifIf (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0) {// If (c == 0)true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
Copy the code

We can see that the logic in tryRelease also embodies the concept of a reentrant lock, and the lock is not released until the value of state is 0. So the value of the exclusivity variable state represents the presence or absence of a lock. When state is 0, the lock is not occupied. If state is not, the current lock is occupied.

private void unparkSuccessor(Node node) {...In general, the thread that needs to be woken up is the next node in the head, but if its lock acquisition is cancelled, or if the node is null
     // Just go ahead and find the first uncancelled successor node.
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        LockSupport.unpark(s.thread);
}
Copy the code

After calling the unpark method, the thread that blocked the lock operation returns to the run state and executes the infinite for loop in the acquireQueued to try again to acquire the lock.

Afterword.

That’s pretty much it for AQS and ReentrantLock. I have to say, I was shocked when I first saw the implementation of AQS. I used to think that Synchronized and ReentrantLock were implemented in the same way, relying on Java virtual machine functionality. Did not expect to have AQS such a big Boss behind the help ah. Learning the principles of this class makes our analysis of many of JUC’s classes much easier. In addition, the CAS operation and lock-free queue algorithms involved in AQS also provide a basis for us to learn other lock-free algorithms. The ocean of knowledge is infinite!