• Article reference: Xiao Liu teacher speak source code

1, the introduction of

  • Semaphore, which holds a series of permitswith each invocationacquire()Will consume one license per callrelease()Will return a license.
  • Semaphore is commonly used to limit the number of accesses to a shared resource at the same time, also known as flow limiting.
  • Semaphore Semaphore, get pass flow chart:

2. Introductory cases

Case 1: Pool.java

/**
 * date: 2021/5/10
 * @author csp
 */
public class Pool {
    /** * The maximum number of threads that can access the resource at the same time */
    private static final int MAX_AVAILABLE = 100;
    
    /** * The semaphore represents the object pass available */
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
    /** * Share resources, you can imagine that the items array stores all Connection objects */
    protected Object[] items = new Object[MAX_AVAILABLE];
    
    /** * The shared resource usage corresponds to the items array. For example, if the * items[0] object is occupied by an external thread, then used[0] == true, otherwise used[0] == false */
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    /** * get a free object * If there are no free objects in the current pool, wait.. Until there are free objects */
    public Object getItem(a) throws InterruptedException {
        // Every time acquire() will consume a license.
        available.acquire();
        return getNextAvailableItem();
    }

    /** * return object to pool */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    Used [I] = true */ used[I] = true */
    private synchronized Object getNextAvailableItem(a) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if(! used[i]) { used[i] =true;
                returnitems[i]; }}return null;
    }

    /** * returns the object to the pool, returns true on success * return failed: * 1. If the object reference does not exist in the pool, false */ is returned
    private synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false; }}return false; }}Copy the code

Case 2: Semaphoretest02.java

/**
 * date: 2020/5/10
 * @author csp
 */
public class SemaphoreTest02 {
    public static void main(String[] args) throws InterruptedException {
        Permitting = 2
        // Fair mode: fair is true
        final Semaphore semaphore = new Semaphore(2.true);

        Thread tA = new Thread(() ->{
            try {
                // Every time acquire() will consume a license.
                semaphore.acquire();
                System.out.println("Thread A succeeded in obtaining A pass");
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
            }finally {
                // Each release() call returns a permittingsemaphore.release(); }}); tA.start();// Make sure thread A executes
        TimeUnit.MILLISECONDS.sleep(200);

        Thread tB = new Thread(() ->{
            try {
                // Acquire (2) will consume 2 permitts
                semaphore.acquire(2);
                System.out.println("Thread B succeeded in obtaining a pass");
            } catch (InterruptedException e) {
            }finally {
                // Release (2) will return 2 permitts
                semaphore.release(2); }}); tB.start();// Make sure thread B executes
        TimeUnit.MILLISECONDS.sleep(200);

        Thread tC = new Thread(() ->{
            try {
                // Every time acquire() will consume a license.
                semaphore.acquire();
                System.out.println("Thread C succeeded in obtaining a pass");
            } catch (InterruptedException e) {
            }finally {
                // Each release() call returns a permittingsemaphore.release(); }}); tC.start(); }}Copy the code

Execution Result:

Thread A succeeded in obtaining the pass thread B succeeded in obtaining the pass thread C succeeded in obtaining the passCopy the code

\

3. Source code analysis

The inner class Sync

  • Through several implementations of Sync, we get the following information:

    • Permissions are passed in when the method is constructed;
    • Permissions are stored in the state variable state;
    • When attempting to obtain a license, the value of state is reduced by 1;
    • When the value of state is 0, the license cannot be obtained.
    • When a license is released, the value of state increases by 1;
    • The number of licenses can change dynamically;
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    // constructor, pass in the number of permissions, put in state
    Sync(int permits) {
        setState(permits);
    }

    // Number of licenses obtained
    final int getPermits(a) {
        return getState();
    }

	// Unfair mode tries to get permission
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // Let's see how many permissions are left
            int available = getState();
            // Subtracting the permissions that need to be obtained this time leaves several permissions
            int remaining = available - acquires;
            // If the remaining license is less than 0, return it directly
            // If the remaining permissions are not less than 0, try atomic updating the value of state and return the remaining permissions on success
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}// Release permission
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // Let's see how many permissions are left
            int current = getState();
            // Add permission for this release
            int next = current + releases;
            // Detect overflow
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // If the atom successfully updated the value of state, the release permission was successful, and true is returned
            if (compareAndSetState(current, next))
                return true; }}// Reduce permissions
    final void reducePermits(int reductions) {
        for (;;) {
            // Let's see how many permissions are left
            int current = getState();
            // Subtract the permissions to be reduced
            int next = current - reductions;
            // Detect overflow
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // Atom updates state, returns true on success
            if (compareAndSetState(current, next))
                return; }}// Destroy the license
    final int drainPermits(a) {
        for (;;) {
            // Let's see how many permissions are left
            int current = getState();
            // If 0, return directly
			// If not, update the state atom to 0
            if (current == 0 || compareAndSetState(current, 0))
                returncurrent; }}}Copy the code

\

The inner class NonfairSync

In unfair mode, call nonfairTryAcquireShared() directly from the parent class to try to get permission.

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
	
	// constructor, which calls the parent constructor
    NonfairSync(int permits) {
        super(permits);
    }
	// Try to get permission by calling the parent's nonfairTryAcquireShared() method
    protected int tryAcquireShared(int acquires) {
        returnnonfairTryAcquireShared(acquires); }}Copy the code

\

The inner class FairSync

In fair mode, check whether there is a queue in front first. If there is a queue, the license fails to be obtained and the queue is entered; otherwise, atomic update of the state value is attempted.

** Note: ** For ease of reading, this inner class pastes some methods from AQS, with a note in the header comment!

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    /** * This method is located in AQS: * Attempts to obtain a pass, returns a value >= 0 on success; * Return < 0 value */ on fetch failure
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // Check whether there is a queue waiting in the current AQS block queue. If there is a queue waiting in the current AQS block queue, return -1, indicating that the current aquire thread needs to enter the queue waiting..
            if (hasQueuedPredecessors())
                return -1;
            // What are some of the situations when I get to this point?
            // 1. AQS blocks queue when aquire is called
            // 2. The current node is a headNext node in the blocking queue

            // Get state, which means pass
            int available = getState();
            // Remaining Remaining indicates the number of semaphore remaining after the current thread obtains the pass
            int remaining = available - acquires;

            // Remaining < 0 is valid, which indicates that the thread fails to obtain the pass..
            // Remaning >= 0, CAS update state is successful, indicating that the thread obtained the pass successfully, CAS failed, then spin.
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                returnremaining; }}/** * This method is located in AQS: */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        If (acquire) the thread that called acquire is already in the interrupt state.
        if (Thread.interrupted())
            throw new InterruptedException();

        // The threads executing tasks at the business level have broken latch. Any other thread that calls latch.await will not block here
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    /** * This method is located in AQS: */
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // Wrap the thread calling semaphore. aquire as a node and add it to the AQS blocking queue.
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // Get the precursor node of the current thread node
                final Node p = node.predecessor();
                // If the condition is true, the node corresponding to the current thread is head.next
                if (p == head) {
                    // head. Next has the right to acquire the shared lock.
                    int r = tryAcquireShared(arg);


                    // From Semaphore's perspective: r represents the number of passes left
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return; }}/ / shouldParkAfterFailedAcquire will find a good father for the current thread, eventually to father node set the status to signal (1), returns true
                // parkAndCheckInterrupt suspends the thread of the current node...
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw newInterruptedException(); }}finally {
            if(failed) cancelAcquire(node); }}/** * This method is in AQS: * Set the current node to head and propagate backwards! (Wake up in turn!) * /
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // Set the current node to the new head node.
        setHead(node);
        Propagate == 1 when propagate == 1
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            // Get the successor node of the current node..
            Node s = node.next;
            // when s == null The current node is tail. This will be true later. DoReleaseShared () handles this case..
            // If (s! = null, the mode of s node must be shared mode. latch.await() -> addWaiter(Node.SHARED)
            if (s == null || s.isShared())
                // In almost all cases the doReleasseShared() method is executed.doReleaseShared(); }}//AQS. ReleaseShared This method is located in AQS:
    public final boolean releaseShared(int arg) {
        If the thread succeeds in releasing the resource, the thread that failed to acquire the resource will wake up.
        if (tryReleaseShared(arg)) {
            // Wake up the thread that failed to acquire the resource...
            doReleaseShared();
            return true;
        }
        return false;
    }

    /** ** Wake up the thread that failed to acquire the resource ** CountDownLatch version * Which paths call doReleaseShared? * 1. Latch.countdown () -> aqs.state == 0 -> doReleaseShared() wakes up the thread corresponding to head.next in the current blocking queue. * 2. The awakened thread - > doAcquireSharedInterruptibly parkAndCheckInterrupt () wake up - > setHeadAndPropagate () - > doReleaseShared () * * Semaphore version * which paths call the doReleaseShared method? * * /
    //AQS. DoReleaseShared This method is in AQS:
    private void doReleaseShared(a) {
        for (;;) {
            // Get the current AQS header
            Node h = head;
            // h! = null; the blocking queue is not empty..
            // not true: h == null
            // After the latch is created, a thread calls the latch.countdown () operation and triggers the logic to awaken the blocking node before any thread calls the await() method..

            // h! If = tail is true, there are other nodes in the blocking queue besides the head node.
            // h == tail -> Head and tail refer to the same node object. When does that happen?
            // 1. In the normal wake up case, the shared lock is acquired in sequence by the current thread (this thread is the tail node).
            // 2. The first thread to call await() and the thread to call countDown() and trigger to wake up the blocking node are concurrent..
            // Because the await() thread is the first one to call latch.await() and there is nothing in the queue, it needs to replenish and create a Head node and then spin again to join the queue
            // Until the await() thread is enqueued, assume that the current queue has only the empty element head that was just created.
            CountDown (); countDown(); countDown(); countDown(); countDown();
            / / note: calls to await () thread Because the team completely finished, once again return to the upper method doAcquireSharedInterruptibly will enter into the spin,
            // Get the precursor of the current element, determine that it is head.next, so the thread will set itself as head again, and then return from await()...
            if(h ! =null&& h ! = tail) {// If the current head must have a successor node!

                int ws = h.waitStatus;
                // The current head state is signal, indicating that the subsequent node has not been awakened...
                if (ws == Node.SIGNAL) {
                    // Change the state of the head node to 0 before waking up the successor node
                    // Why is CAS used here? Back to say...
                    // When the doReleaseShared method has multiple threads to wake up the head.next logic,
                    // CAS may fail...
                    / / case:
                    If (h == head) returns false, t3 will continue spinning. Participate in the logic of waking up the next head.next..
                    // the CAS WaitStatus(h, node.signal, 0) is displayed successfully. T4 is also in if (ws == node. SIGNAL) before t3 is changed.
                    // The CAS WaitStatus(h, node.signal, 0) will fail because t3 is changed...
                    if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
                        continue;            // loop to recheck cases
                    // Wake up the successor node
                    unparkSuccessor(h);
                }

                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }

            // Conditional:
            The thread in the setHeadAndPropagate method has not been executed to set the current thread as head.
            // At this point, the current thread directly jumps out... The end..
            // Do not worry if the wake up logic breaks here? ,
            // There is no need to worry, because the awakened thread will execute the doReleaseShared method sooner or later.

            // 2. H == null latch is created without any thread calling await() method,
            // a thread called latch.countdown () and triggered the logic to wake up the blocking node..
            // 3.h == tail -> Head and tail point to the same node object

            // The condition does not exist:
            // The awakened node is very active and sets itself up as the new head. If it wakes up its node (precursor), the h == head condition will not be true..
            // The head precursor does not break out of the doReleaseShared method and continues to wake up the successor of the new head.
            if (h == head)                   // loop if head changed
                break; }}}Copy the code

\

A constructor

Semaphore creation requires the number of permissions passed in. Semaphore is also unfair by default, but you can declare it fair by calling the second constructor.

// constructor, which is created by passing in the number of permissions, using unfair mode by default
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// construct method, need to pass in the number of permissions, and whether fair mode
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code

\

Acquire () method

To obtain a license, the default mode is interruptible. If the attempt to obtain a license fails, it will enter the queue of AQS.

public void acquire(a) throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// Obtain a license, non-interrupt mode, if the attempt to obtain a license fails, will be queued in the AQS queue.
public void acquireUninterruptibly(a) {
     sync.acquireShared(1);
}
Copy the code

Acquire (int permits) method

Obtain more than one license at a time, can interrupt mode.

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

// Obtain multiple permissions at once, in non-interrupt mode.
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
Copy the code

TryAcquire () method

Try to obtain a license, use Sync’s unfair mode to try to obtain a license method, regardless of whether the license was obtained or not, only try once, will not be queued.

public boolean tryAcquire(a) {
    return sync.nonfairTryAcquireShared(1) > =0;
}

If no permission has been obtained during this time, return false; otherwise return true;
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
Copy the code

Release () method

Release a license, which increases the value of state by one and wakes up the next thread waiting to acquire the license.

public void release(a) {
    sync.releaseShared(1);
}
Copy the code

Release (int permits) method

The value of state increases the number of permitts when multiple permitts are released at one time.

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
Copy the code

\

4, summary

  • Semaphore, also known as a Semaphore, is commonly used to control access to shared resources at the same time.
  • Semaphore’s internal implementation is based on AQS shared locks;
  • Semaphore initialization needs to specify the number of permissions. The number of permissions is stored in state.
  • When obtaining a license, the state value is reduced by 1;
  • When a license is released, the state value increases by 1;