[Copyright notice] Without the consent of the blogger, declined to reprint! (please respect the original, blogger, maintain) blog.csdn.net/javazejian/… From zejian’s blog

Related articles:

In-depth understanding of Java type information (Class objects) and reflection mechanisms

In-depth understanding of Java enumeration types (EnUms)

In-depth understanding of Java Annotation types (@annotation)

An in-depth understanding of Java classloaders

In-depth understanding of Java concurrency implementation principles of Synchronized

Java Concurrent Programming – Lock-free CAS with Unsafe classes and their parallel Atomic packages

In-depth understanding of the Java Memory model (JMM) and the volatile keyword

The reentrant lock based on concurrent AQS and its Condition implementation principle are analyzed

Implementation of Shared Lock Based on Concurrent AQS (Based on Semaphore)

The previous article analyzed the realization principle of the exclusive lock mode through ReetrantLock. That is, based on the AQS synchronization framework, this article intends to analyze the realization principle of the shared lock mode from Semaphore. Different from the exclusive lock mode, the shared lock mode allows multiple threads to obtain the synchronization state at the same time. This article will first explain the basic usage of Semaphore and then analyze the implementation of shared locks through Semaphore’s internal implementation principle, which is actually based on the AQS synchronizer, as we will see later. If you want to know the implementation principle of the exclusive lock mode in AQS, you can browse the blogger’s previous post: In-depth analysis of the concurrent AQS based ReetrantLock and its Condition implementation principle, and the following is the main content of this article

  • Semaphore – Semaphore
    • Use of Semaphore shared locks
    • Semaphore implements mutex
  • Implementation of shared locks in Semaphore
    • Summary of Semaphore implementation internals
    • A shared lock in an unfair lock
    • Shared locks in fair locks
    • summary

Semaphore – Semaphore

Use of Semaphore shared locks

Semaphore, also known as a Semaphore, is used to coordinate threads in a multi-threaded environment to ensure that they can properly use common resources. The Semaphore maintains a license set, and when we initialize Semaphore we need to pass in a quantity value for this license set that represents the number of threads that can access a shared resource at the same time. A thread can acquire a license through the acquire() method and then operate on the shared resource. Note that if the license set has been allocated, the thread will wait until another thread releases the license. The thread releases the license through the release() method. Here is a simple example to illustrate

public class SemaphoreTest {

    public static void main(String[] args) {  
       / / thread pool
       ExecutorService exec = Executors.newCachedThreadPool();  
       // Set the number of concurrent semaphore threads to 5
       final Semaphore semp = new Semaphore(5);  
       // Simulate 20 client accesses
       for (int index = 0; index < 20; index++) {
           final int NO = index;  
           Runnable run = new Runnable() {  
               public void run() {  
                   try {  
                       // Use acquire() to acquire locks
                       semp.acquire();  
                       System.out.println("Accessing: " + NO);  
                       // Sleep for 1 second
                       Thread.sleep(1000);  

                   } catch (InterruptedException e) {  
                   }  finally {
                        // The lock is releasedsemp.release(); }}}; exec.execute(run); }Exit the thread poolexec.shutdown(); }}Copy the code

In the above code, five permissions are initialized when Semaphore is created. This means that five threads are allowed to access shared resources at the same time, acquire permissions for each thread using acquire(), and sleep for 1 second. If all five permissions have been allocated, new incoming threads will enter the wait state. If the thread completes the operation successfully, the permission will be released through the release() method. When we execute the code, we can see that every 1 second there are 5 thread calls at almost the same time, as shown below

Semaphore implements mutex

Passing 1 when initializing the semaphore makes it available for use with at most one license, and thus can be used as a mutually exclusive lock. This is often referred to as a binary semaphore because it can have only two states: one available license or zero available license. When used this way, the binary semaphore has the property (unlike many Lock implementations) that the “Lock” can be released by the thread rather than by the owner (since there is no concept of ownership). Here is a quick example of Semaphore implementing mutex

/ * * * Created by zejian on 2017/7/30. * Blog: http://blog.csdn.net/javazejian [the original address, please respect the original] * /
public class SemaphoreMutex {
    // Initialize to 1, mutex
    private final static Semaphore mutex = new Semaphore(1);

    public static void main(String[] args){
        ExecutorService pools = Executors.newCachedThreadPool();

        for (int i=0 ; i < 10; i++){final int index = i;
           Runnable run = new Runnable() {
               @Override
               public void run() {
                   try {
                       mutex.acquire();
                       System.out.println(String.format("[Thread-%s] Task ID -- %s",Thread.currentThread().getId(),index));
                       TimeUnit.SECONDS.sleep(1);
                       mutex.release();
                       System.out.println("-----------release");
                   } catch(InterruptedException e) { e.printStackTrace(); }}}; pools.execute(run); } pools.shutdown(); }}Copy the code

Create a mutex Semaphore (1) and execute it concurrently for 10 threads. Use the Semaphore to control the concurrent execution of threads

In addition to the aquire() and Release () methods for obtaining licenses, other methods are provided as follows


// Select * from ()
// Create Semaphore with a given number of permissions and unfair fair Settings.
Semaphore(int permits) 

// Create Semaphore with a given number of permissions and a given fair setting. True is a fair lock
Semaphore(int permits, boolean fair) 

// Obtain permission from this semaphore without interruption
void acquireUninterruptibly() 

// Returns the number of permissions currently available in this semaphore.
int availablePermits() 

// Get and return all permissions immediately available.
int drainPermits() 

// Return a collection of threads that may be waiting to be fetched.
protected  Collection<Thread> getQueuedThreads(a);// Returns the estimated number of threads waiting to be fetched.
int getQueueLength() 

// Check whether there are threads waiting to fetch.
boolean hasQueuedThreads() 

// Returns true if this semaphore's fair setting is true.
boolean isFair() 

Get permission from a semaphore only if there is one available at the time of the call.
boolean tryAcquire() 

A license is obtained from this semaphore if a license is available and the current thread is not interrupted during the given waiting time.
boolean tryAcquire(long timeout, TimeUnit unit) 
Copy the code

Implementation of shared locks in Semaphore

Summary of Semaphore implementation internals

Before delving into Semaphore’s inner workings, take a look at the structure of a class diagram

According to the class diagram, the class structure of Semaphore is almost identical to that of ReetrantLock analyzed in the previous article. Semaphore also has an internal class Sync that inherits from AQS, as well as FairSync and NofairSync that inherit from Sync. This is enough to show that Semaphore’s internal implementation is also based on the AQS concurrency component. In the previous article, Semaphore’s internal implementation principle was also based on the AQS concurrency component. As we mentioned, AQS is the basic component, which is only responsible for core concurrent operations, such as joining or maintaining the synchronization queue, controlling the synchronization state, etc., while specific locking and unlocking operations are done by subclasses, so subclasses Semaphore need to achieve the acquisition and release of shared locks by themselves. The two methods are tryAcquireShared(int ARg) for getting locks and tryReleaseShared(int ARg) for releasing locks, as can be seen from Semaphore’s internal structure

TryAcquireShared (int ARg) Semaphore’s internal classes FairSync and NoFairSync implement different methods for acquiring locks. Sync releases the lock tryReleaseShared(int arg), which is best implemented in Sync’s parent class because the release is the same. It is important to understand that when we call Semaphore’s methods, they are executed internally by indirectly calling their inner classes or AQS. Let’s start with Semaphore’s source code to analyze the principle of shared lock implementation. Here we start with the unfair lock.

A shared lock in an unfair lock

Semaphore’s constructor is as follows

Permits Specifies the number of threads accessing the shared resource at the same time
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

public Semaphore(int permits, boolean fair) {
     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
 }Copy the code

Obviously, when we create through the default constructor, we create an unfair lock,

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
          super(permits);
    }
   // Call nonfairTryAcquireShared of Sync's parent class
   protected int tryAcquireShared(int acquires) {
       returnnonfairTryAcquireShared(acquires); }}Copy the code

Obviously, permitspassed to the parent class will eventually be passed to the state variable in AQS, that is, the synchronization state variable, as follows

// The state variable in AQS that controls the synchronization state
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer {

    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    // Perform CAS on the state variable
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }}Copy the code

From this point, Semaphore’s initialization value is also the initialization value of state. When we call Semaphore acquire(), the process goes like this: when a thread requests, if the number of permissions represented by state is sufficient, the thread will acquire the synchronization status, i.e. access to the shared resource, and update the value of state (usually minus state by 1). However, if the number of permits represented by state is 0, the requesting thread will not be able to obtain the synchronization state, and will be added to the synchronization queue and blocked until other threads release the synchronization state (usually by adding state by 1) before gaining access to the shared resource. Invoke the Semaphore acquire () method will be invoked after the AQS acquireSharedInterruptibly () as follows

/ / Semaphore acquire ()
public void acquire(a)throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
  }

/ * * * note Sync class inherits from AQS * AQS acquireSharedInterruptibly * / () method 
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // Determine whether to interrupt the request
    if (Thread.interrupted())
        throw new InterruptedException();
    // If tryAcquireShared(arg) is not less than 0, the thread succeeds in obtaining the synchronization status
    if (tryAcquireShared(arg) < 0)
        // Failed to be added to the synchronization queue
        doAcquireSharedInterruptibly(arg);
}Copy the code

The method name implies that it is interruptible, which means Semaphore’s acquire() method is also interruptible. In acquireSharedInterruptibly () method within the first thread interrupt judgment, without interruption, so try to call tryAcquireShared (arg) method for synchronous state, if successful, then the method performs end, If get failure call doAcquireSharedInterruptibly (arg); Method joins the synchronization queue to wait. Here tryAcquireShared(arg) is a template method, AQS does not provide a concrete implementation, by the subclass, that is, Semaphore internal implementation, this method in Semaphore non-fair lock implementation as follows

//Semaphore non-fair lock NonfairSync tryAcquireShared()
protected int tryAcquireShared(int acquires) {
    // The implementation method in Sync is called
    return nonfairTryAcquireShared(acquires);
}

/ / the Syn class
abstract static class Sync extends AbstractQueuedSynchronizer {

    final int nonfairTryAcquireShared(int acquires) {
         // Use an infinite loop
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             // Check whether the semaphore is less than 0 or whether the CAS execution is successful
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 returnremaining; }}}Copy the code

NonfairTryAcquireShared (int Acquires) Obtains the value of State and subtracts the value of Remaining. If remaining is not less than 0, the thread succeeds in obtaining the synchronization state and can access the shared resources. And update the value of the state if the remaining greater than zero, then the thread failed to get the synchronization status, will be joining synchronous queue (through doAcquireSharedInterruptibly (arg)), pay attention to the Semaphore acquire () there may be concurrent operation, Therefore, the nonfairTryAcquireShared() method body adopts the lockless (CAS) concurrent operation to ensure the security of state value modification. Failed to get synchronous state how to try, then will perform doAcquireSharedInterruptibly (int arg) method

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     // Create SHARED Node node.shared and add it to the synchronization queue
   final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         // Enter the spin operation
         for (;;) {
             final Node p = node.predecessor();
             // check whether the precursor is head
             if (p == head) {
                 // Try to get the synchronization status
                 int r = tryAcquireShared(arg);
                 // If r>0, the synchronization status is successfully obtained
                 if (r >= 0) {
                     // Set the current thread node to a header and propagate
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     failed = false;
                     return; }}// Adjust the status of nodes in the synchronization queue and determine whether they should be suspended
           If the interrupt directly throws an exception, the request to the current node ends
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 throw newInterruptedException(); }}finally {
         if (failed)
             // Terminates the request of the node threadcancelAcquire(node); }}Copy the code

In this method, since the current thread does not get the synchronization state, create a Node in node.shared mode and join the synchronization queue with addWaiter(Node.shared). After joining the Node, the current thread will enter the spin state. If r is greater than 0, it indicates that the synchronization status has been successfully obtained. Set the current thread to head and propagate. Propagation means that the remaining permission value of the synchronization state is not 0, and inform the subsequent nodes to continue to obtain the synchronization state. The thread that gets the synchronization status will perform the original task. But if the precursor is not HEAD or is head and attempts to get synchronization status fail, Then call shouldParkAfterFailedAcquire (p, node) method to determine whether a precursor node waitStatus values for SIGNAL synchronization in the queue and adjust the node node state, if return true, Execute the parkAndCheckInterrupt() method to suspend the current thread and return a flag as to whether the thread was interrupted.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // Get the wait state of the current node
        int ws = pred.waitStatus;
        // Return true if the state is waiting to wake up
        if (ws == Node.SIGNAL)
            return true;
        // if ws>0, the state is complete.
        // Iterate through the precursor node until it finds a node with no end state
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // If ws is less than 0 and is not SIGNAL state,
            // Set it to SIGNAL, indicating that the thread of this node is waiting to wake up.
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        // Suspend the current thread
        LockSupport.park(this);
        // Get the status of a thread interrupted by interruption ()
        // Does not interrupt the thread, so it may be true or false, and returns
        return Thread.interrupted();
}Copy the code

At this point, the entire process of joining the synchronization queue is complete. To sum up, there is a variable state in AQS. When we create a Semaphore object and pass in the permission value, we will eventually assign the value to state. The value of state represents the number of threads that can operate on shared data at the same time. Each time a thread request (such as a call to Semaphored’s acquire() method) succeeds in obtaining the synchronization state, the value of state is reduced by 1 until state reaches 0, indicating that no permissions are available, that is, the maximum number of threads operating on shared data has been reached, and other subsequent threads are blocked. At this point, AQS will encapsulate threads into Node nodes in shared mode, join the synchronization queue to wait and start the spin operation. Only after the thread that has access to the shared data completes the task and releases the synchronization state, can the node thread in the synchronization queue get the synchronization state and wake up to perform the synchronization operation. Note that the node that obtains the synchronization status in the synchronization queue will be set as head and the relevant thread data will be cleared (after all, the thread is already executing and there is no need to save the information). AQS implements shared locks in this way. The simple model is as follows

The previous analysis was of interruptible requests, as opposed to only uninterruptible requests (these methods all exist in AQS and are called indirectly by subclass Semaphore)

AcquireShared ()
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return; }}if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // No exception is thrown...
                    interrupted = true; }}finally {
            if(failed) cancelAcquire(node); }}private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);// Set to the header
        /* * Attempts to wake up the next node in the queue if the following conditions are met: The caller explicitly says "propagate "(propagate > 0), * or h.waitStatus is propagate (set by the previous operation) * and * the next node is in shared mode or null. * * Conservatism in both of these checks can lead to unnecessary wakeup, but only if there are * * * threads trying to get/release synchronized state, so most * cases get the desired signal */ immediately  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            // Wake up the successor node, since it is in shared mode, allowing multiple threads to obtain the synchronization state simultaneouslydoReleaseShared(); }}Copy the code

Obviously interrupt request with the front belt doAcquireSharedInterruptibly (int arg) method is less judgment as well as the exception is thrown, thread interrupt other operations, about doReleaseShared (), put the back analysis. When a thread completes its task, it will release the synchronization state, and the state value will increase by 1. Start with Semaphore’s release() method

/ / Semaphore release ()
public void release() {
       sync.releaseShared(1);
}

// Call AQS releaseShared(int arg)
public final boolean releaseShared(int arg) {
       // Call the tryReleaseShared method of subclass Semaphore to try to release the synchronization state
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
  }Copy the code

Semaphore indirectly calls the releaseShared(int ARg) method in AQS and attempts to release the synchronization state through tryReleaseShared(ARG). Then doReleaseShared() is called to wake up the subsequent thread in the synchronization queue, with tryReleaseShared(int Releases) as follows

// Implemented in Semaphore's internal class Sync
protected final boolean tryReleaseShared(int releases) {
       for (;;) {
              // Get the current state
             int current = getState();
             // Release state increase Releases releases
             int next = current + releases;
             if (next < current) // overflow
                 throw new Error("Maximum permit count exceeded");
              // Update the value of state with CAS
             if (compareAndSetState(current, next))
                 return true; }}Copy the code

The logic is simple: release the synchronization state and update the value of state. It is worth noting that lock-free operations, i.e., for infinite loops and CAS operations, must be performed to ensure thread-safety, as multiple threads may release the synchronization state at the same time. After successful release, wake up the successor with the doReleaseShared() method.

private void doReleaseShared() {
    /* * Ensures that the release action (to the end of the synchronous wait queue) is delivered, even if no other * request or release action is in progress. If the successor of the head node needs to wake up, the wake * action is performed; If not required, set the wait state of the header to PROPAGATE guarantee * wake up pass. Besides, loops are necessary to prevent new nodes from coming into the process, so, unlike the other unparksucceeded methods, reexamination is needed if the wait state is set to fail. * /  
    for (;;) {
        Node h = head;
        if(h ! =null&& h ! = tail) {// Get the thread state corresponding to the head node
            int ws = h.waitStatus;
            // If the thread corresponding to the head node is in SIGNAL state, it means the head
            // The thread corresponding to the node's successor needs to be awakened by unpark.
            if (ws == Node.SIGNAL) {
                // Change the thread state of the header to 0. If it fails, the cycle continues.
                if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                    continue;
                // Wake up the thread corresponding to the successor of the header h
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // If the header changes, the loop continues. Otherwise, exit the loop.
        if (h == head)                   // loop if head changed
            break; }}// Wake up the thread corresponding to the successor of the incoming node
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
       // Get the next node
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for(Node t = tail; t ! =null&& t ! = node; t = t.prev)if (t.waitStatus <= 0)
                  s = t;
      }
      if(s ! =null)
          // Wake up the thread
          LockSupport.unpark(s.thread);
    }Copy the code

Obviously the doReleaseShared() method wakes up the threads corresponding to the successors of the head by calling the unparkprecursor (h) method. Thread A is currently in the doReleaseShared() method and is about to replace the head. Thread A is currently in the doReleaseShared() method and is about to replace the head. Thread B runs back and asks for the resource, so when we call the propagate (Node Node, int propagate), the propagate=0

 if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            // Wake up the successor node, since it is in shared mode, allowing multiple threads to obtain the synchronization state simultaneously
                doReleaseShared();
        }Copy the code

However, in order to ensure that the thread that continuously wakes up the subsequent Node, that is, the doReleaseShared() method, can be called, the waitStatus of the head can be set to node.propagate, so that thread B can also execute doReleaseShared() for the subsequent Node to wake up or PROPAGATE. Note that doReleaseShared() can be called by both the release operation and the get operation, but the purpose of both is to wake up the successor node, and because it is shared mode, multiple threads are allowed to obtain the synchronous state at the same time. This is the end of the analysis of the release process, the process of the release operation is relatively simple, that is, try to update the state value, the update successfully call doReleaseShared() to wake up the corresponding thread of the subsequent node.

Shared locks in fair locks

In fact, the sharing mode of the fair lock is different from that of the unfair lock in obtaining the synchronization state, but other basic same, look at the implementation of the fair lock

static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // Check whether there are any nodes in the queue
                // Obtain the synchronization status.
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    returnremaining; }}}Copy the code

The only difference between the tryAcquireShared(int acquires) method and the unfair lock tryAcquired (int Acquires) method is that the HasqueuedToraise () method is called to determine whether a node exists in the synchronous queue before attempting to obtain the synchronization state. If so, -1 is returned. The thread is added to the synchronization queue to wait. This ensures that the first thread request will be executed, which is called a fair lock. As for the other operations, it is the same as the unfair locks analyzed earlier.

summary

After analyzing the internal implementation principle of Semaphore, we have a basic understanding of the implementation of shared lock, that is, the number of threads accessing shared resources is controlled by the state value in AQS. Whenever the thread requests the synchronization state successfully, the state value will decrease by 1. If the number of threads exceeds the limit, the Node nodes encapsulated in shared mode will be added to the synchronization queue and wait until other executing threads release the synchronization state, and the state value will increase by 1 after each thread completes the task and releases the synchronization state. This is the basic implementation model of shared lock. The difference between the fair lock and the unfair lock is that the fair lock will judge whether there is Node in the synchronization queue before the thread requests the synchronization state. If there is Node, the requesting thread will be wrapped as Node Node and added to the synchronization queue, so as to ensure that each thread obtains the synchronization state in a first-come, first-served order. The unfair lock is obtained through competition, regardless of whether there are Node nodes in the synchronization queue, only through competition can obtain the thread execution rights.

Ok ~, this is the end of the source code interpretation based on reference materials and personal understanding, if there is any mistake, please leave a message thank you.