The introduction

In the previous article, we analyzed the realization principle of AQS exclusive mode based on ReetrantLock and Condition implementation principle. This chapter intends to explore the realization of AQS sharing mode from the perspective of Semaphore Semaphore. The difference between the shared mode and the exclusive mode is that in the shared mode, multiple threads are allowed to acquire lock resources at the same time, whereas in the exclusive mode, only one thread is allowed to hold lock resources at the same time.

First, quickly understand Semaphore Semaphore and actual combat

A Semaphore Semaphore is a concurrent utility class under the java.util.Concurrent (JUC) package that controls the number of threads accessing a critical resource (a shared resource) at any one time to ensure that threads accessing a critical resource are properly using the common resource. And its internal is the same as ReetrantLock, through direct or indirect call AQS framework method. There is a concept of “license” in Semaphore:

Initialization of the Semaphore Semaphore requires passing in a value for this license that represents the maximum number of threads that can access a critical resource at any one time, also known as the license set. If a thread wants to access critical resources, it needs to execute acquire() to obtain a license first. If the license set has been allocated at the time of acquiring, the thread will enter the blocking waiting state and can not obtain the license until other licensed threads are released. When the thread has accessed the critical resource, it needs to execute the release() method to release the acquired permissions.

The concept of “permissions” in Semaphore is similar to the concept of “synchronized state identification” in mutex, which is also referred to as “lock resources”. Here’s a quick look at the methods provided by the Semaphore class:

After calling this method, the thread attempts to obtain a license from the license set
public void acquire(a)

// When the thread calls this method, the acquired permissions are released
public void release(a)

Permits → number of permits
Semaphore(int permits) 

Permits → number of permits→ fair→ fair
Semaphore(int permits, boolean fair) 

// Get permission from semaphore. This method does not respond to interrupts
void acquireUninterruptibly(a) 

// Returns the number of unacquired permissions in the current semaphore
int availablePermits(a) 

// Gets and returns all permissions in the current semaphore that are not immediately acquired
int drainPermits(a) 

// Return a Collection of all threads waiting for permission
protected  Collection<Thread> getQueuedThreads(a);

// Returns the estimated number of threads waiting for permission
int getQueueLength(a) 

// Check if there are threads waiting to acquire permissions in the current semaphore
boolean hasQueuedThreads(a) 

// Returns the fair type of the current semaphore, true for a fair lock, false for an unfair lock
boolean isFair(a) 

// Get one of the permissions in the current semaphore, return false not to block if no permissions are available
boolean tryAcquire(a) 

// Obtain one of the permissions in the current semaphore within the given time
boolean tryAcquire(long timeout, TimeUnit unit) 
Copy the code

Above are some of the main methods provided by Semaphore Semaphore, which can be demonstrated in a simple example. The requirements are as follows:

Now there is a requirement in the project, which needs to deal with a large number of Excel data and data reconciliation requests in the database for a long time every night. Since file reading is an IO intensive task, we can use multi-threading to optimize and accelerate the processing speed. However, in this project, because there are other services to be processed, the implementation of this service can only use a maximum of three database connection objects to ensure overall performance. Because if the current thread business at the same time to obtain the number of database connection is overmuch, can lead to other business thread need operation when the database is couldn’t get blocked connection object (because the database connection object and thread object data cherish/resources co., LTD.), causing the overall program piled up a large number of client requests to paralyze the system as a whole. At this point, we need to limit the number of threads accessing the database to three at a time. Semaphore can be used for flow control.

public class SemaphoreDemo {
    public static void main(String[] args) {
        // Custom thread pools (more on this in a future article)
        // Environment: quad-core quad-thread CPU task blocking factor 0.9
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                4*2.40.30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1024*10),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        // Set the maximum number of semaphore threads at one time to 3
        final Semaphore semaphore = new Semaphore(3);
        // Simulate 100 reconciliation requests
        for (int index = 0; index < 100; index++) {
            final int serial = index;
            threadPool.execute(()->{
                try {
                    // Use acquire() to obtain permission
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() +
                            "Thread granted permission successfully! Request number:" + serial);
                    // Simulate database IO
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }  finally {
                    // Release permissions after critical resource access endssemaphore.release(); }}); }// Close the thread pool resourcethreadPool.shutdown(); }}Copy the code

In the above code, three permissions are initialized for the Semaphore object when it is created, which means that three threads are allowed to access the critical resource at the same time. Before a thread can access a critical resource, it needs to successfully acquire a license using Acquire (). If a thread acquires a license, the new thread enters the wait state when the license set of the semaphore object has been allocated. Threads that have previously obtained permission need to execute the release() method to release the obtained permission after completion. If we execute the above example, we can see that three threads are accessed at the same time almost every thousand milliseconds, as follows:

/* First second: pool-1-thread-1 thread has been granted permission. 0 Pool-1-thread-2 Thread successfully obtained permission. Request Number: 1 Pool-1-thread-3 The thread obtained permission successfully. Request no. : 2 Second: After the program runs 1000ms, the pool-1-thread-4 thread successfully obtains permission! Request Number: 3 Pool-1-thread-5 The thread successfully obtained the license. Request No. : 4 Pool-1-thread-6 The thread successfully obtained the license. Request no. : 5 Third second: After the program runs 2000ms, the pool-1-thread-7 thread successfully obtains permission! Request Number: 6 Pool-1-thread-8 The thread successfully obtained the license. 7 The pool-1-thread-2 thread successfully obtained the license. Request sequence number: 8 Fourth second:........ after 3000ms * /
Copy the code

As a simple Demo, Semaphore usage is relatively simple. But we also mentioned this earlier:

The concept of “permission” in Semaphore is similar to the “synchronization status identification” of mutex that we analyzed in the previous article.

Can we use Semaphore Semaphore to implement an exclusive lock? And the answer is yes, yes. We only need to assign one number to the license set when creating the semaphore object, as follows:

final Semaphore semaphore = new Semaphore(1);

Realization of AQS sharing mode in Semaphore Semaphore

Semaphore Semaphore ReetrantLock actually with our previous article had analyzed class structure is roughly same, its internal existence and its inherited from the internal Sync AbstractQueuedSynchronizer class two subclasses: Semaphore’s internal implementation, like ReetrantLock’s, is based on the AQS component. As mentioned in the previous article, AQS is not designed to expose services directly as an invocation class, but rather as a basic component for sending and sending packets, providing the infrastructure for other concurrency utility classes, such as maintaining synchronization queues, controlling/modifying synchronization state, etc. The logic of acquiring and releasing locks is left to the subclasses themselves to preserve the flexibility of the framework as much as possible. So both Semaphore and ReetrantLock need to be implemented alonetryAcquireShared(int arg)Gets the lock method as welltryReleaseShared(int arg)Release lock method. The relationship of AQS overall class diagram is as follows:

)

TryAcquireShared (ARG) is implemented by two subclasses FairSync and NofairSync respectively. After all, there is a slight difference between fair and unfair locks, and the tryReleaseShared(ARG) logic is implemented by Sync. Since the release is the same, it is best to implement it in Sync’s parent class. Below we will analyze the AQS sharing mode from the perspective of Semaphore source code implementation principle, we first from the unfair lock acquisition lock implementation.

2.1 NofairSync unfair lock implementation of Semaphore in AQS shared mode

Like ReetrantLock, we manually select fair and unfair locks when creating Semaphore objects:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code

Semaphore creates an unfair lock type by default if we do not choose to declare a fair lock type when we create Semaphore. NonfairSync is constructed as follows:

static final class NonfairSync extends Sync {
    Constructor: passes permits to the parent class state
    NonfairSync(int permits) {
          super(permits);
    }
   // The lock release method is implemented by calling Sync's parent class
   protected int tryAcquireShared(int acquires) {
       returnnonfairTryAcquireShared(acquires); }}Copy the code

Semaphore’s NonfairSync constructor is created by calling the Sync constructor. Permitspassed to the Semaphore object will be passed to the AQS synchronizer state. As follows:

// superclass-sync class constructor
Sync(int permits) {
    setState(permits); Call the set method inside AQS
}

/ / AQS AbstractQueuedSynchronizer synchronizer
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {
    // Synchronize status
    private volatile int state;
    
    protected final int getState(a) {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    // Perform CAS on the state variable
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }}Copy the code

According to the above analysis, Semaphore object creation passed permitsactually initializes state within AQS. After initialization, state represents the number of licenses available for the current semaphore object.

2.1.1 Semaphore non-fair lock NonfairSync obtain license/lock implementation

Semaphore.acquire() method is called when we use Semaphore to acquire the lock. The thread calling the method will start to acquire the lock/license. CAS increment with permits/state will be attempted. Acquire () Semaphore acquire() Semaphore acquire()

// Semaphore class → acquire(
public void acquire(a) throws InterruptedException {
      / / Sync class inheritance AQS, direct call here AQS internal acquireSharedInterruptibly () method
      sync.acquireSharedInterruptibly(1);
  }

/ / AbstractQueuedSynchronizer class - acquireSharedInterruptibly () method
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // Check if there is a thread interrupt signal (flag)
    if (Thread.interrupted())
        throw new InterruptedException();
    // If the result of tryAcquireShared(arg) is not less than 0, the thread succeeds in obtaining the synchronization status
    if (tryAcquireShared(arg) < 0)
        // Failed to join the synchronization queue
        doAcquireSharedInterruptibly(arg);
}
Copy the code

Semaphore access permission method to acquire () is ultimately through the Sync within the object call AQS acquireSharedInterruptibly () method, And acquireSharedInterruptibly () in the process of acquiring the synchronization status identification can be response to a thread interrupt operation, if the operation without interruption, the first call tryAcquireShared (arg) trying to get a license number, successful return to perform business, end of method. If get failure, then call doAcquireSharedInterruptibly (arg) will join the synchronous queue blocked waiting for the current thread. The tryAcquireShared(ARG) method is an AQS template method that implements NonfairSync without a fair lock.

    // Semaphore → NofairSync → tryAcquireShared(
protected int tryAcquireShared(int acquires) {
    // The implementation method in Sync is called
    return nonfairTryAcquireShared(acquires);
}

// Syn class → nonfairTryAcquireShared()
abstract static class Sync extends AbstractQueuedSynchronizer {
    final int nonfairTryAcquireShared(int acquires) {
         // Start the spin deathloop
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             // Determine whether the number of available permissions in the semaphore is less than 0 or whether the CAS execution is successful
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 returnremaining; }}}Copy the code

NonfairTryAcquireShared (AcquiRes) obtains the state value and subtracts one to obtain the remaining value. If the value is not less than 0, the remaining value indicates that there is still a license available in the semaphores. The current thread tries cas to update the state value. If the CAS succeeds, the synchronization status is successfully obtained, and the value of Remaining is returned. Otherwise, if the remaining value is less than 0, it indicates that the remaining value of the semaphore has been acquired by other threads. Therefore, the remaining value of the semaphore is not available. The value of nonfairTryAcquireShared(Acquires) is returned. Back to AQS acquireSharedInterruptibly () method. When the return of the remaining value is less than 0, the if (tryAcquireShared (arg) < 0) conditions, to enter the if execution doAcquireSharedInterruptibly (arg) method to add the current thread synchronization queue blocked, waiting for another thread to release sync. The method for listing a thread is as follows:

/ / AbstractQueuedSynchronizer class - doAcquireSharedInterruptibly () method
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // Create a Node in node.shared mode and add it to a synchronization queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
     // Enable the spin operation
     for (;;) {
         final Node p = node.predecessor();
         // Check whether the precursor node is head
         if (p == head) {
             // Try to get synchronization state
             int r = tryAcquireShared(arg);
             // If r is not less than 0, the synchronization status is successfully obtained
             if (r >= 0) {
                 // Set the current thread node as the head node and wake up the subsequent node threads
                 setHeadAndPropagate(node, r);
                 p.next = null; // empty to facilitate GC
                 failed = false;
                 return; }}// Adjust the status of the node node in the synchronization queue and determine whether it should be suspended
       // Determine whether there is an interrupt signal, if necessary, directly throw an exception to end the execution
         if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
             throw newInterruptedException(); }}finally {
     if (failed)
         // Terminates the request of the node threadcancelAcquire(node); }}/ / AbstractQueuedSynchronizer class - setHeadAndPropagate () method
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Get the original head node in the synchronization queue
    setHead(node); // Set the passed node node as the head node
    Propagate = propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= * not empty judgment standard writing, avoid the head as well as the new head node if the node is empty * semaphore object remaining available license number greater than 0 or * h originally head node or node is not an end state of the new nodes are awakened thread * * write two subsequent node if the reason to avoid unnecessary awakening, It is possible that after waking up the threads of the subsequent * nodes, no thread has released the permissions/locks, resulting in another block */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // Avoid passing the node as the only node in the synchronization queue.
        // If there is only one node in the queue, then s must be empty
        if (s == null || s.isShared())
            doReleaseShared(); // Wake up the successor node}}Copy the code

In doAcquireSharedInterruptibly (arg) method in total did three things:

  • Create a Node in node.shared mode and join the queue with addWaiter()
  • Second, after joining successfully, start the spin to determine whether the precursor node is HEAD. If yes, try to obtain the synchronization status identifier. After obtaining the synchronization status, set itself as head node
  • 3. If the node whose precursor node is not head or the node whose precursor node is HEAD but fails to obtain synchronization status, callshouldParkAfterFailedAcquire(p,node)Check whether the precursor node is in SIGNAL stateshouldParkAfterFailedAcquire(p,node)The for loop must be executed at least twice before it returns ture, setting the precursor node to SIGNAL the first time and detecting SIGNAL the second timeparkAndCheckInterrupt()Suspends the current thread and returns the thread interrupt status

As above is doAcquireSharedInterruptibly (arg) method about work, then we can look at shouldParkAfterFailedAcquire () and parkAndCheckInterrupt () method:

/ / AbstractQueuedSynchronizer class - shouldParkAfterFailedAcquire () method
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // Get the wait status of the current node
    int ws = pred.waitStatus;
    // Return true if the state is waiting to wake up
    if (ws == Node.SIGNAL)
        return true;
    // If the wait status of the current node is greater than 0, it is the end state.
    // Iterate through the precursor node until it finds a node with no end state
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // If the current node wait state is less than 0 and is not SIGNAL state,
        // Sets it to SIGNAL state, indicating that the thread of the node is waiting to wake up
        // The node has just been transferred from the Condition to the synchronization queue.
        // The node state is CONDITION (CONDITION does not exist in Semaphore,
        // So the synchronization queue does not appear this state of the node, the code does not execute)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/ / AbstractQueuedSynchronizer class - parkAndCheckInterrupt () method
private final boolean parkAndCheckInterrupt(a) {
    // Suspend the current thread
    LockSupport.park(this);
    // Get the status of a thread interrupted by interruption ()
    // Instead of interrupting the thread, the thread needs to interrupt and returns true or false
    returnThread.interrupted(); } LockSupport → park() methodpublic static void park(Object blocker) {
    Thread t = Thread.currentThread();
    // Sets the monitor blocker for the current thread
    setBlocker(t, blocker);
    // Calls native methods to jVM-level blocking mechanisms to block the current thread
    UNSAFE.park(false.0L);
    // Empty the blocker after blocking
    setBlocker(t, null);
}
Copy the code

shouldParkAfterFailedAcquire()The SIGNAL () method checks whether a node’s precursor is in SIGNAL state and returns true if so. If the waitStatus of the precursor node is greater than 0(only CANCELLED end status =1>0), it means that the precursor node is no longer used and should be removed from the synchronous queue. A DO /while loop should be executed to traverse all the precursor nodes until the node with non-cancelled end state is found. If the node is in SIGNAL wake-up state, the call is directly invokedparkAndCheckInterrupt()Suspend the current thread. At this point the entireSemaphore.acquire()The process of obtaining the license method is complete. The diagram below:



As shown in the figure above, there is a variable state in the AQS synchronizer. Permitspassed by the Semaphore object during initialization will be indirectly assigned to the state synchronization identifier in AQS, andpermits/stateRepresents the maximum number of threads that can simultaneously access critical/shared resources at the same time. When a thread callsSemaphore.acquire()When obtaining a license, the thread first determines whether state is greater than 0. If state is greater than 0, it indicates that there are still licenses available. If state is reduced by 1, the thread succeeds in obtaining and returns to execution. Until state is zero, indicating that the current semaphore has no available permissions, subsequent threads need to encapsulate as Node nodes and add them to the synchronization queue to enable spin operation until a thread releases permissions (state plus one).

At this point, the analysis of the acquisition principle of non-fair lock in AQS sharing mode is completed. Semaphore also implements a set of non-interruptible access methods, as follows:

Semaphore class → acquireUninterruptibly() method
public void acquireUninterruptibly(a) {
    sync.acquireShared(1);
}

/ / AbstractQueuedSynchronizer class - acquireShared () method
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

/ / AbstractQueuedSynchronizer class - doAcquireShared () method
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // In the previous interruptible lock method, an exception was thrown directly to force the thread to interrupt
                // In the non-interruptible fetch method, there is no exception thrown to interrupt the thread
                interrupted = true; }}finally {
        if(failed) cancelAcquire(node); }}Copy the code

Observing the above source code is not difficult to find, can respond to the thread interrupt method and unresponsive thread interrupt method difference lies in:

Methods that respond to thread interrupts detect thread interrupts before each operation and throw an exception to force the thread to interrupt. In contrast, unresponsive thread interrupt methods do not detect thread interrupt signals and do not throw exceptions to enforce interrupts.

2.1.2 Non-fair lock NonfairSync release license/lock implementation of semaphores

Permits will be released after calling Semaphore. Permits /state will be added with permitting. Semaphore license method release()

// Semaphore class → release(
public void release(a) {
    sync.releaseShared(1);
}

/ / AbstractQueuedSynchronizer - releaseShared (arg) method
public final boolean releaseShared(int arg) {
    // Call the tryReleaseShared() method in subclass Semaphore
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
Copy the code

The Semaphore method release() is done by indirectly calling AQS internal releaseShared(ARG), just like the previous method to obtain the license. Since AQS releaseShared(ARG) is a magic method, the final logical implementation is done by the Semaphore subclass Sync as follows:

// Semaphore class → Sync subclass → tryReleaseShared(
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // Get the current synchronization state value in AQS
        int current = getState();
        // Increment the current state value
        int next = current + releases;
        // Cannot occur unless the incoming releases are negative
        if (next < current) 
            throw new Error("Maximum permit count exceeded");
        // CAS updates the state value to the added next value
        if (compareAndSetState(current, next))
            return true; }}Copy the code

The method logic for releasing locks/permissions is relatively simple, adding one to state in AQS to release the acquired synchronization state. However, it is worth noting that in the AQS exclusive mode implementation we shared in the previous article, there is no guarantee of thread-safety in the lock release logic, because the lock release logic in the exclusive mode only has one thread operating at the same time. In the shared mode, multiple threads may release license/lock resources at the same time, so CAS+ spin is used to ensure thread safety.

If tryReleaseShared(Releases)CAS updates successfully here, then doReleaseShared() is executed in if(tryReleaseShared(ARG)). Wake up the subsequent node threads.

/ / AbstractQueuedSynchronizer class - doReleaseShared () method
private void doReleaseShared(a) {
    /* * To prevent other threads from entering the queue during the release process, spin must be turned on * * retest to continue the loop if the head node setting fails */
    for (;;) {
        // Get the queue head
        Node h = head; 
        // If the head node is not empty and there are other nodes in the queue
        if(h ! =null&& h ! = tail) {// Get the node status of the head node
            int ws = h.waitStatus; 
            // If the node status is SIGNAL waiting to wake up
            if (ws == Node.SIGNAL) { 
                // Try the CAS to change the node status to 0
                // Failure continues the next loop
                // Wake up the successor nodes of the first node on success
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;         
                unparkSuccessor(h);  // Wake up subsequent node threads
            }
            // Try to change the node state to PROPAGATE state when the node state is 0
            // Fail to exit the loop and continue the next loop
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        // Continue the loop if the current queue head node changes, terminate the spin if it does not
        if (h == head)
            break; }}/ / AbstractQueuedSynchronizer class - unparkSuccessor () method
// Argument: pass in the node that needs to wake up the successor node
private void unparkSuccessor(Node node) {
    // Get the thread status of node
    int ws = node.waitStatus;
    if (ws < 0)
        // Set head to 0
        compareAndSetWaitStatus(node, ws, 0);
    // Get the successor node
    Node s = node.next;
    // If the successor node is empty or the thread state has ended
    if (s == null || s.waitStatus > 0) {
        s = null;
        // Walk through the queue to get a wakeup node
        for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                s = t;
    }
    if(s ! =null)
        // Wake up subsequent node threads
        LockSupport.unpark(s.thread);
}
Copy the code

The head nodes are obtained directly in the execution of the doReleaseShared() method, waking up the threads in the head successor nodes by calling the unparkprecursor () method. And because the logic of the method body is a for(;;) {} infinite loop, exit condition is: Exit only if the queue head does not change. If it does, another thread must have acquired the lock during the current thread’s lock/permission release process, so the loop continues. In the second loop, if the condition H.waitstauts ==0, The waitStauts of the head Node is set to the Node.propagate state to ensure that the wake up is transmitted. In AQS shared mode, multiple threads will operate on the synchronization state identifier at the same time. For example, thread T1 is performing release()→doReleaseShared() and has just woken up its successor threads to replace the head node (ready to replace but not yet replaced). At this moment, another thread T2 just performs acquire()→doAcquireShared()→setHeadAndPropagate() at the same moment, assuming that the thread T2 obtains the last available permission, On execution to the setHeadAndPropagate() method (in which there is an extra-long judgment), the propagate=0 is passed in:

/ / AbstractQueuedSynchronizer class - setHeadAndPropagate () method
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Get the original head node in the synchronization queue
    setHead(node); // Set the passed node node as the head node
    Propagate = propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= * not empty judgment standard writing, avoid the head as well as the new head node if the node is empty * semaphore object remaining available license number greater than 0 or * h originally head node or node is not an end state of the new nodes are awakened thread * * write two subsequent node if the reason to avoid unnecessary awakening, It is possible that after waking up the threads of the subsequent * nodes, no thread has released the permissions/locks, resulting in another block */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // Avoid passing the node as the only node in the synchronization queue.
        // If there is only one node in the queue, then s must be empty
        if (s == null || s.isShared())
            doReleaseShared(); // Wake up the successor node}}// Long judgment: This judgment is used to ensure that the lock can be obtained in a timely manner in the face of various special situations
if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    // Avoid passing the node as the only node in the synchronization queue.
    // If there is only one node in the queue, then s must be empty
    if (s == null || s.isShared())
        doReleaseShared(); // Wake up the successor node
}
Copy the code

According to the logic of the super-long judgment, because the propagate=0 passed in means that there are no available permissions at present, it does not meet the first condition propagate>0 in the super-long judgment, so theoretically, after T2 thread obtains the lock, it does not need to wake up other threads to obtain the lock/license. However, since the T1 thread has already accessed the critical resource and is releasing the permissions it holds, there is a situation where the successor of the head node in the queue has a high chance of obtaining the permissions that the T1 thread is releasing if it attempts to acquire the lock/permissions at this time. Therefore, when the lock is released, the waitStauts of the head Node is set to the node.propagate status value is -3, which satisfies the third condition h.waitStatus < 0 in the super-long judgment, so at this time T2 will also wake up the thread waiting for the lock/permission resource in the successor Node of the head. The advantage of this approach is that it takes care of the head’s successor nodes while ensuring that the wake is delivered.

Why does the thread that acquired the lock/permission need to wake up the subsequent node thread? Because this is a shared lock, not an exclusive lock. If a thread has just acquired a shared lock/license, it is likely that there are other shared locks available for subsequent threads in the queue, so they need to wake up.

At this point, the license release logic ends, which is relatively simple compared to the license acquisition logic. You just need to update the state value and call the doReleaseShared() method to wake up the subsequent node threads. However, there are two kinds of threads that call the doReleaseShared() method:

  • One is the thread that releases the shared lock/license number. The release() method must be called to wake up subsequent threads when it is called to release permission
  • The second is the thread that has just acquired the number of shared locks/permissions. Under certain circumstances, it will also be called to wake up subsequent threads when any condition of “super-long judgment” is met

2.2 FairSync fair lock implementation of Semaphore in AQS shared mode

The implementation of fair locks in AQS sharing mode is roughly the same except that the logic of acquiring locks is slightly different from that of non-fair locks.

2.2.1 FairSync in semaphore to obtain license/lock implementation

The concept of fair lock means that the thread that requests the lock first must execute it before the thread that requests the lock later, and obtains the lock resources first. In terms of time, the sequence of execution needs to be guaranteed.

Fair lock license execution logic: Semaphore. Acquire () method to obtain permission to AQS. AcquireSharedInterruptibly () method to AQS. TryAcquireShared () template method to access a Shared lock FairSync. TryAcquireShared () method

// Semaphore class → constructor
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Semaphore class → acquire(
public void acquire(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

/ / AbstractQueuedSynchronizer class - acquireSharedInterruptibly () method
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // Call the template method defined by AQS to obtain the shared lock
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
/ / AbstractQueuedSynchronizer class - tryAcquireShared () template method
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// Semaphore → FairSync
static final class FairSync extends Sync {
    FairSync(int permits) {
        super(permits);
    }
    
    // Semaphore → FairSync → tryAcquireShared(
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // Difference: Check whether there are nodes in the queue before obtaining the lock
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}}Copy the code

As is clear from the lock/license code, the only difference between a fair lock implementation and an unfair lock is that: To acquire a lock in fair lock mode, the Hasqueued24 () method is called to determine whether a node exists on the synchronous queue. If there is direct return 1 return to acquireSharedInterruptibly () method if (tryAcquireShared (arg) < 0), Call doAcquireSharedInterruptibly (arg) method to the current thread encapsulated into the Node. The SHARED queue waiting for sharing nodes to join synchronization. Otherwise, if there are no nodes in the queue, try to obtain the lock/permission directly.

2.2.2 FairSync in semaphore release license/lock implementation

The logic of a fair lock release license is the same as that of an unfair lock implementation, since both are subclasses of Sync, and the logic of a lock release is to wake up the thread of the subsequent node after a decrement of state. The specific implementation of lock release is left to the Sync class, which won’t be repeated here.

Third, the difference between ReetrantLock and Semaphore

Compare the item ReetrantLock Semaphore
Implementation pattern Exclusive mode Sharing model
Method of obtaining lock tryAcquire() tryAcquireShared()
Lock release method tryRelease() tryAcquireShared()
Whether reentrant is supported support Does not support
Thread the interrupt support support
Condition support Does not support
The queue number One synchronization + multiple waits A single synchronous
The node type Node.EXCLUSIVE Node.SHARED

Other implementers of the shared mode

In addition to Semaphore Semaphore is based on AQS sharing mode, CountDownLatch, ReetrantReadWriteLock Read and write lock in JUC are based on AQS sharing mode. We can also take a look at the use of CountDownLatch.

4.1 Actual situation of CountDownLatch application

When CountDownLatch is initialized, as with Semaphore, we need to pass in a number count as the maximum number of threads

CountDownLatch countDownLatch = new CountDownLatch(3);

This parameter is also indirectly assigned to the state synchronization status identifier within the AQS. Normally we call two of its methods: await() and countDown() :

  • Await () : The thread calling the await() method will be wrapped as a shared node and queued to block until state=0 before waking up all threads in the queue
  • CountDown () : The thread that calls the countDown() method subtracts the state by one

There are two uses of CountDownLatch:

  • Init count=1, multiple threads await() and one thread calls countDown() to wake up all blocked threads
  • Initialize count=x, multithreaded countDown() subtracts count by one, a thread await() blocks, and the blocked thread starts executing when count=0

The above two usages can also have many application scenarios in our project. The usage of “many-one” can be used to simulate concurrency test interface concurrency security problems and deadlock problems to a certain extent, such as:

final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 3; i++) {
    new Thread(() -> {
        try {
            System.out.println("Thread:" + Thread.currentThread().getName()
            + "... Block waiting!");
            countDownLatch.await();
            // This is where you can call methods or interfaces that require concurrent testing
            System.out.println("Thread:" + Thread.currentThread().getName()
            + "... Start executing!");
        } catch(InterruptedException e) { e.printStackTrace(); }},"T" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown();

/* The program starts running: thread: T2.... Blocking wait! Thread: T1... Blocking wait! Thread: T3... Blocking wait! One second after the program runs (three threads execute almost simultaneously) : Thread: T2.... Start executing! Thread: T1... Start executing! Thread: T3... Start executing! * /
Copy the code

In the example above, a CountDownLatch object is created, and count=1 is passed during initialization, creating three threads in a loopT1,T2,T3The main thread calls countDown() one second later to wake up the three threads in the synchronization queue to continue executing, as follows:

In actual development process, we often before and after many concurrent tasks have dependencies, such as details page you need to call to complete data aggregation, and executed in parallel to obtain multiple interfaces to the data after the need to merge the results, multiple operations after the completion of the need for data checking, etc., and we can use these scenarios wait more usage:

final CountDownLatch countDownLatch = new CountDownLatch(3);
Map data = new HashMap();
for (int i = 1; i <= 3; i++) {
    final int page = i;
    new Thread(() -> {
        System.out.println("Thread:" + Thread.currentThread().getName() +
                "... Read segmented data:"+(page-1) *200+"-"+page*200+"Line");
        // Add data to the result set: data.put();
        countDownLatch.countDown();
    }, "T" + i).start();
}
countDownLatch.await();
System.out.println("Thread:" + Thread.currentThread().getName() 
        + "... Process data set: data");
        
/* Thread: T1.... Read segmented data: 0-200 lines thread: T2.... Read segmented data: 200-400 lines thread: T3.... Read segmented data: 400-600 line threads main.... Process the data set: data */
Copy the code

In the example above, the for loop opens three threadsT1,T2,T3The parallel execution increases the processing efficiency by reading data at the same time. After reading, the data is added to the data result set and summarized. The main thread waits for the completion of reading by three threads to process the data set, as follows:

4.2 Implementation principle of CountDownLatch

As mentioned earlier, CountDownLatch is also implemented based on the AQS shared mode. Like Semaphore, CountDownLatch indirectly assigns the incoming count to the state synchronization status identifier within AQS.

private final Sync sync;

// CountDownLatch constructor
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // Initialize the internal Sync object
    this.sync = new Sync(count);
}

// CountDownLatch class → Sync inner class
private static final class Sync extends AbstractQueuedSynchronizer {
    // Sync constructor: assigns state within AQS
    Sync(intcount) {setState(count); }// Calling the await() method will end up here
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0)?1 : -1; }}CountDownLatch class → await() method
public void await(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
Copy the code

In CountDownLatch, there is an internal Sync class. When we create a CountDownLatch object, its internal constructor is actually initializing its Sync object, just as we said earlier

CountDownLatch countDownLatch = new CountDownLatch(count);

Initialization time passed count Numbers will eventually by calling setState (state) method to AQS internal synchronization status identification state variables, and when a thread calls to await () method, will call AQS acquireSharedInterruptibly () method:

/ / AbstractQueuedSynchronizer class - acquireSharedInterruptibly () method
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // Finally call the tryAcquireShared() method of the Sync class inside CountDownLatch
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// CountDownLatch class → Sync inner class → tryAcquireShared() method
protected int tryAcquireShared(int acquires) {
    return (getState() == 0)?1 : -1;
}
Copy the code

Because the AQStryAcquireShared(arg)Method is only the cause of the template method, so the thread is executingacquireSharedInterruptibly()Method will eventually call the Sync class inside CountDownLatchtryAcquireShared()Methods. Returns true if count/state is 0, false if count is not 0, and returnsif(tryAcquireShared(arg)<0)If count is not 0, it is executeddoAcquireSharedInterruptibly(arg)Method encapsulates the current thread information asNode.SHAREDThe shared node joins the synchronization queue and waits. The principle is as follows:



The above isCountDownLatch.await()The principle of implementation, generally speaking, or relatively simple. Now let’s go ahead and analyze itCountDownLatch.countDown()Method implementation.

CountDownLatch class → countDown() method
public void countDown(a) {
    sync.releaseShared(1);
}

/ / AbstractQueuedSynchronizer class - releaseShared () method
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

/ / AbstractQueuedSynchronizer - > template method: tryReleaseShared ()
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

// CountDownLatch class → Sync inner class → tryReleaseShared() method
// Calling the countDown() method ends up here
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0; }}Copy the code

callCountDownLatch.countDown()Method will be called after AQStryReleaseShared()Template method, finally calledCountDownLatchIn theSync inner classthetryReleaseShared()Methods. In this method, the first one is thestate/countEvaluate 0 once, subtract count/state by one if it is not 0, and then evaluate the updated one againstate/countReturn true if state is equal to 0 after subtracting one, returnreleaseShared()theif(tryReleaseShared(arg))performdoReleaseShared()Wake up the blocking thread in the synchronization queue. On the other hand, if the subtraction is not zero, the current thread returns directly and the method ends. As follows:

4.3 Differences between CountDownLatch and CyclicBarrier

There is another utility class in the JUC package that does the same for CountDownLatch: CyclicBarrier. Unlike CountDownLatch, CyclicBarrier is implemented in exclusive mode based on AQS, which internally implements thread blocking and awakening through ReetrantLock and Condition. The comparison is as follows:

Compare the item CountDownLatch CyclicBarrier
Implementation pattern Sharing model Exclusive mode
Count way subtraction subtraction
Reuse support Do not reuse The count can be set to 0
Reset the support Do not reset Can be reset
Design is the key Wait more More etc.

Five, the summary

After analyzing the principle of Semaphore and CountDownLatch, it is not difficult to know that the license number/counter transmitted during initialization will eventually be indirectly transmitted to the synchronization status identification state of AQS. When a thread attempts to acquire the shared lock, the state will be subtracted by one. When the state is 0, it means that no shared lock is available. Other threads with subsequent requests will be encapsulated as shared nodes and added to the synchronization queue until other threads holding the shared lock are released (state plus one). However, different from the exclusive mode, in the shared mode, in addition to waking up the thread of the successor node when the lock is released, the thread that successfully obtains the shared lock will also wake up the successor node when certain conditions are met. As for the fair and unfair lock in the shared mode, it is the same as the fair and unfair lock in the previous exclusive mode. In the case of the fair lock, it is determined whether there is Node in the queue before acquiring the lock, so as to ensure that each thread will execute the shared lock in a first-come, first-served order. In the case of non-fair lock, it is obtained through thread competition. Regardless of whether Node Node already exists in the queue, the requesting thread will first execute the logic to obtain the lock. As long as the execution succeeds, the shared lock will be obtained and the thread execution right will be obtained.