This article has participated in the “Please check | you have a free opportunity to apply for nuggets peripheral gifts” activity.

preface

If you are lucky enough to apply for a place in the nuggets welfare activity, you will have a chance to get a new badge provided by the Nuggets official by participating in the comments. Details of the lucky draw are at the end of the article.

Introduction of Semaphore

Semaphore is a concurrency utility class in the JUC package that controls the number of concurrent threads accessing critical resources (shared resources) to ensure that threads accessing critical resources are properly using common resources. Semaphore, like ReetrantLock, is implemented by directly or indirectly invoking the AQS framework.

Semaphore internally maintains a virtual set of permissions, the number of which can be specified by the constructor argument. Before accessing a particular resource, you must obtain permissions using the acquire method, and if the number of permissions is zero, the thread blocks until there are any available permissions. After accessing the resource, release the license using Release.

The realization of the Semaphore

Methods provided in the Semaphore class:

Public void Acquire () public void Release () // Semaphore constructor: Permits → number of permits Semaphore(int permits) // permits Permits → number of permits→ fair→ permits→ Fair Void acquireUninterruptibly() void acquireUninterruptibly() void acquireUninterruptibly() void acquireUninterruptibly() void availablePermits() int availablePermits() int availablePermits() int availablePermits() int Protected Collection<Thread> getQueuedThreads(); Boolean hasQueuedThreads() // Returns the fair type of the current semaphore, Boolean isFair() // Get a license from the current semaphore, Boolean tryAcquire() // Obtain a license in the current semaphore within the given time, Return false Boolean tryAcquire(long timeout, TimeUnit unit)Copy the code

Semaphore’s implementation is based on AQS shared locks, with fair and unfair modes.

Sync implementation

abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) {setState(permits); Sync(int permits) {setState(permits); Final int getPermits() {return getState(); } Final int nonfairTryAcquireShared(int acquires) {for (;) { int available = getState(); Int remaining = available - acquires; / / license number greater than 0, the CAS for permission if (remaining < 0 | | compareAndSetState (available, remaining)) return the remaining; }} // Release license protected Final Boolean tryReleaseShared(int Releases) {for (;) { int current = getState(); Int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); If (compareAndSetState(current, next)) return true; }} final void reducePermits(int reductions) {for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); // if (compareAndSetState(current, next)) return; }} final int drainPermits() {for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; }}}Copy the code

Fair implementation of FairSync

static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); Protected int tryAcquireShared(int acquires) {for (;) If (HasqueuedToraise ()) return-1; Int available = getState(); int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }}}Copy the code

The differences between the implementation of fair lock and unfair lock are as follows: To acquire a lock in fair lock mode, the Hasqueued24 () method is called to determine whether a node exists on the synchronous queue. If there is direct return 1 return to acquireSharedInterruptibly () method if (tryAcquireShared (arg) < 0), Call doAcquireSharedInterruptibly (arg) method to the current thread encapsulated into the Node. The SHARED queue waiting for sharing nodes to join synchronization. If no node exists in the queue, try to obtain the lock or permission directly.

Unfair Sync implementation

static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } // Use Sync's nonfairTryAcquireShared() implementation directly, Protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); }}Copy the code

Semaphore’s NonfairSync constructor is based on calling its parent Sync constructor. Permitspassed when creating Semaphore will be passed to the state of the AQS synchronizer as follows:

Sync(int permits) {setState(permits); // parent -sync constructor Sync(int permits) {setState(permits); / / call AQS internal set method} / / AQS (AbstractQueuedSynchronizer) synchronizer public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {/ / synchronization state identity private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; Protected final Boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }}Copy the code

Semaphore object creation with permitsfinally initializes state within AQS. After initialization, state represents the number of licenses available for the current semaphore object.

An unfair lock NonfairSync is granted

// Semaphore class → acquire() public void acquire() throws InterruptedException { Inside the direct call AQS acquireSharedInterruptibly sync () method. The acquireSharedInterruptibly (1); } / / AbstractQueuedSynchronizer - acquireSharedInterruptibly () method of public final void acquireSharedInterruptibly (int arg) Throws InterruptedException {if (thread.interrupted ()) throw new InterruptedException(); // If tryAcquireShared(arg) is not less than 0, The thread for synchronous state if success (tryAcquireShared (arg) < 0) / / not successful to join synchronous queue blocked waiting doAcquireSharedInterruptibly (arg); }Copy the code

Semaphore access permission method to acquire () is ultimately through the Sync within the object call AQS acquireSharedInterruptibly () method, And acquireSharedInterruptibly () in the process of acquiring the synchronization status identification can be response to a thread interrupt operation, if the operation without interruption, the first call tryAcquireShared (arg) trying to get a license number, successful return to perform business, end of method. If get failure, then call doAcquireSharedInterruptibly (arg) will join the synchronous queue blocked waiting for the current thread. The tryAcquireShared(ARG) method is an AQS method with no specific implementation, and is implemented in the NonfairSync class as follows:

// Semaphore class → NofairSync inner class → tryAcquireShared() protected int tryAcquireShared(int acquires) return nonfairTryAcquireShared(acquires); } / / Syn - nonfairTryAcquireShared () method to abstract the static class Sync extends AbstractQueuedSynchronizer {final int NonfairTryAcquireShared (int Acquires) {// Start the spin loop for (;) { int available = getState(); int remaining = available - acquires; / / determine whether available in semaphore permits has < 0 or CAS implementation success if (remaining < 0 | | compareAndSetState (available, remaining)) return remaining; }}}Copy the code

If the value is not less than 0, it indicates that there are still available licenses in the semaphore. The current thread attempts to update the state value with CAS. If CAS succeeds, it indicates that the synchronization state is successfully obtained, and the remaining value is returned. Otherwise, if the remaining value is less than 0, it indicates that the remaining value of the semaphore has been acquired by other threads. Therefore, the remaining value of the semaphore is not available. The value of nonfairTryAcquireShared(Acquires) is returned. Back to AQS acquireSharedInterruptibly () method. When the return of the remaining value is less than 0, the if (tryAcquireShared (arg) < 0) conditions, to enter the if execution doAcquireSharedInterruptibly (arg) method to add the current thread synchronization queue blocked, waiting for another thread to release sync.

/ / AbstractQueuedSynchronizer class - doAcquireSharedInterruptibly () method of the private void doAcquireSharedInterruptibly (int arg) Throws InterruptedException {// Create a Node in node.shared SHARED mode and add it to a synchronization queue final Node Node = addWaiter(node.shared); boolean failed = true; Try {// Enable the spin operation for (;;) { final Node p = node.predecessor(); If (p == head) {state int r = tryAcquireShared(arg); If (r >= 0) {setHeadAndPropagate(node, r); p.next = null; // GC failed = false; return; }} // Adjust the status of the node node in the synchronization queue and determine whether it should be suspended // Determine whether there is an interrupt signal, If you need the if an exception is thrown directly over execution (shouldParkAfterFailedAcquire (p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); }} finally {if (failed) cancelAcquire(node); }} / / AbstractQueuedSynchronizer class - setHeadAndPropagate () method of the private void setHeadAndPropagate (Node Node, int propagate) { Node h = head; SetHead (node); Propagate = propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= propagate= * not empty judgment standard writing, avoid the head as well as the new head node if the node is empty * semaphore object remaining available license number greater than 0 or * h originally head node or node is not an end state of the new nodes are awakened thread * * write two subsequent node if the reason to avoid unnecessary awakening, Since it is very likely that no thread has released permissions/locks after waking up the threads of the subsequent * nodes, Leading to once again in blocking * / if (the propagate > 0 | | h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.w. aitStatus < 0) {Node s = node.next; / / to avoid incoming node for synchronous queue only nodes, / / if only because the queue node to a node, the node "s displacement after certainly is empty if (s = = null | | s.i sShared ()) doReleaseShared (); // Wake up the successor node}}Copy the code

Release the license

The logic of a fair lock release license is the same as that of an unfair lock implementation, since both are subclasses of Sync, and the logic of a lock release is to wake up the thread of the subsequent node after a decrement of state. The specific implementation of lock release is left to the Sync class

// Semaphore class → release() public void release() {sync.releaseshared (1); } / / AbstractQueuedSynchronizer - releaseShared (arg) method public final Boolean releaseShared (int arg) {/ / If (tryReleaseShared(arg)) {doReleaseShared(); return true; } return false; }Copy the code

Semaphore.release() will be called, permitting /state will be released

The Semaphore method release() is done by indirectly calling AQS internal releaseShared(ARG), just like the previous method to obtain the license. Since AQS releaseShared(ARG) is a magic method, the final logical implementation is done by the Semaphore subclass Sync as follows:

// Semaphore class → Sync subclass → tryReleaseShared() protected Final Boolean tryReleaseShared(int Releases) {for (;) Int current = getState(); // Add the current state value int next = current + releases; If (next < current) throw new Error("Maximum permit count exceeded"); If (compareAndSetState(current, next)) return true; }}Copy the code

The logic of obtaining a license is much simpler, simply updating the state value and calling the doReleaseShared() method to wake up subsequent node threads. But there are two threads that call the doReleaseShared() method:

One is the thread that releases the shared lock/license number. The release() method must be called to wake up subsequent threads when it is called to release permission

The second is the thread that has just acquired the number of shared locks/permissions.

conclusion

The permissions/counters passed during initialization are eventually passed indirectly to the AQS synchronization status identifier state. When a thread attempts to acquire the shared lock, the state will be subtracted by one. When the state is 0, it means that no shared lock is available. Other threads with subsequent requests will be encapsulated as shared nodes and added to the synchronization queue until other threads holding the shared lock are released (state plus one). Different from the exclusive mode, in the shared mode, the thread that successfully obtains the shared lock will also wake up the subsequent node under certain conditions, in addition to the thread that wakes up the subsequent node when the lock is released. Java lock: AQS Details (1) Java lock: AQS details (2)

Draw that

1. Just participate in the comments (technology-related, life-related, raffle fun);

2. Lottery rules: If the comments of this article meet the requirements of gold-digging activity, the top two winners in the hot comment area will be given a new gold-digging badge respectively (if there are no hot comments, two lucky users will be drawn from the comment area);