This is the ninth day of my participation in the First Challenge 2022

1.AQS

In business, CountDownLatch is used to control the notification of task completion between multiple threads in the parallel execution of code blocks. Recently, I suddenly wanted to see how CountDownLatch is realized in await and wake up, so I began to read the source code and look up information, and then opened a new door. Found that it is based on AbstractQueuedSynchronizer (hereinafter referred to as “AQS) framework.

So let’s first understand what AQS does. It provides two functions can be summarized as: obtain resources, if the acquisition fails to join the queue and sleep the thread; The resource is released, and the condition of waking up the thread in the wait queue is checked. If so, the thread is woken up to continue execution.

Already, CountDownLatch, ReentrantReadWriteLock is based on the class. It provides several methods for subclasses to replicate their respective functions:

TryAcquire (int arg)// tryRelease(int arg)// isHeldExclusively()// Check whether the current thread has obtained the exclusive resource tryAcquireShared(int) Arg)// Try to obtain the shared resource tryReleaseShared(int ARG)// Try to release the shared resourceCopy the code

These five methods can be divided into two classes of exclusive class interfaces (the first three) and shared class interfaces (the last two), because a subclass typically only needs to implement one of these classes (as it should, ReentrantReadWriteLock requires both exclusive and shared classes, but is also implemented in two classes). So the authors do not write them as abstract methods, which are more friendly to subclasses of AQS.

The concept of AQS exclusivity and sharing is explained here. This is a description of AQS resources (i.e. the state field). That is, after meeting the condition that the resource can be acquired, whether the waiting threads in the queue can wake up one (exclusive to one thread) or all waiting threads can wake up (shared).

CountDownLatch and ReentrantLock have different definitions for the state field of AQS. The state field of AQS has different definitions for CountDownLatch and ReentrantLock. AQS internal implementation of the acquisition of resources, release logic, so that the subclass is mainly trying to obtain and release the scene logic of resources.

The following is the analysis of the internal logic of AQS, but its subclasses do not need to pay attention to these logic, these methods are mainly implemented: to obtain resources, after the acquisition failure, join the queue and let the thread block, release resources to meet the conditions of thread monitoring, from the waiting queue to remove and wake up the corresponding thread. Therefore, when looking at CountDownLatch and ReentrantLock code, we can not look closely at the implementation of this method, but first understand the function of each method and understand the logic of subclasses, and then slowly study the internal control of AQS.

1.1 Obtaining Shared Resources

Public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }Copy the code

DoAcquireShared (arg) doAcquireShared(arg) doAcquireShared(arG) doAcquireShared(arG) doAcquireShared

 /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//1
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//2
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//3
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
Copy the code

Because the resource cannot be retrieved, all threads are queued and blocked. Here it should be noted that AQS wait queue is realized by a bidirectional linked list. Node Node consists of front and back nodes, current thread, and wait state value. So point 1 is adding a shared schema node to the list. I’m not going to write this method, but it uses CAS+ spin lock free to make sure it inserts correctly in multiple threads.

In 2 place whether the current node is the first node in the current queue (head is an empty head node, head to the next node, is the first node in a wait queue thread), if it is the first, check in time if I can get the lock, if can get the lock, this thread is removed from the queue directly, Continue to implement the (maybe someone asked before trying to get once, isn’t it, why here is trying to, I think is to reduce the number of threads worthless state changes), if still cannot access to resources, 3, shouldParkAfterFailedAcquire should this method is to determine the thread to sleep

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//2 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred =  pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }Copy the code
 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
Copy the code

You can see that the thread is blocked with locksupport. park.

1.2 Releasing Shared Resources

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
Copy the code
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) {//1 Node h = head; if (h ! = null && h ! = tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (! compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && ! compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }}Copy the code

If the current node is freed, it will continue to be convenient until the queue is empty or becomes unwakeable

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t ! = null && t ! = node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s ! = null) LockSupport.unpark(s.thread); }Copy the code

Release the shared node AQS from the queue and uppark the thread.

2. CountDownLatch implementation

At the heart of CountDownLatch is the Sync inner class that implements AQS

private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) {//1 return (getState() == 0) ? 1:1; } protected boolean tryReleaseShared(int releases) {//2 // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; }}}Copy the code

You can see that the tryAcquireShared and tryReleaseShared methods are implemented in one and two places, and it should be easy to understand with the above AQS source code analysis.

Public count downlatch (int count) {//CDL class constructor if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void countDown() { sync.releaseShared(1); }Copy the code

AQS if (tryAcquireShared(arg) < 0) executes the attempted call. This is implemented by Sync, which determines whether the current state is equal to 0, equal to the number of times the coutnDown call was initialized. No blocking. CountDown also handles state-1 with CAS in Sync.

3.ReentrantLock

Reentrant lock provides two modes of fair and unfair lock, their only difference is in the order of preemption lock, so the internal implementation of fair and unfair lock () method is also different, fair mode, there is a FIFO queue for sorting.

Unfair locks are listed here

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
Copy the code

In this case, state 0 means that no thread currently holds the lock, >0 means that there is a thread holding the lock, because it is reentrant, so once the current thread locks, state will be +1, as shown in code 1 below

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //1 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }Copy the code

The reentrant lock uses the exclusive mode of AQS, which is somewhat different in details. AQS also has two sets of methods, and the whole logic is consistent and different

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h ! = null && h.waitStatus ! = 0) unparkSuccessor(h); return true; } return false; }Copy the code

The first node of the AQS queue is awakened only once in the wake-wait process. When the unlock method is called, the lead thread is woken up using the locksupport.unpark () method