- Article reference: Xiao Liu teacher speak source code
1, the introduction of
- Semaphore, which holds a series of permitswith each invocation
acquire()
Will consume one license per callrelease()
Will return a license. - Semaphore is commonly used to limit the number of accesses to a shared resource at the same time, also known as flow limiting.
- Semaphore Semaphore, get pass flow chart:
2. Introductory cases
Case 1: Pool.java
/**
* date: 2021/5/10
* @author csp
*/
public class Pool {
/** * The maximum number of threads that can access the resource at the same time */
private static final int MAX_AVAILABLE = 100;
/** * The semaphore represents the object pass available */
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/** * Share resources, you can imagine that the items array stores all Connection objects */
protected Object[] items = new Object[MAX_AVAILABLE];
/** * The shared resource usage corresponds to the items array. For example, if the * items[0] object is occupied by an external thread, then used[0] == true, otherwise used[0] == false */
protected boolean[] used = new boolean[MAX_AVAILABLE];
/** * get a free object * If there are no free objects in the current pool, wait.. Until there are free objects */
public Object getItem(a) throws InterruptedException {
// Every time acquire() will consume a license.
available.acquire();
return getNextAvailableItem();
}
/** * return object to pool */
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
Used [I] = true */ used[I] = true */
private synchronized Object getNextAvailableItem(a) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if(! used[i]) { used[i] =true;
returnitems[i]; }}return null;
}
/** * returns the object to the pool, returns true on success * return failed: * 1. If the object reference does not exist in the pool, false */ is returned
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false; }}return false; }}Copy the code
Case 2: Semaphoretest02.java
/**
* date: 2020/5/10
* @author csp
*/
public class SemaphoreTest02 {
public static void main(String[] args) throws InterruptedException {
Permitting = 2
// Fair mode: fair is true
final Semaphore semaphore = new Semaphore(2.true);
Thread tA = new Thread(() ->{
try {
// Every time acquire() will consume a license.
semaphore.acquire();
System.out.println("Thread A succeeded in obtaining A pass");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}finally {
// Each release() call returns a permittingsemaphore.release(); }}); tA.start();// Make sure thread A executes
TimeUnit.MILLISECONDS.sleep(200);
Thread tB = new Thread(() ->{
try {
// Acquire (2) will consume 2 permitts
semaphore.acquire(2);
System.out.println("Thread B succeeded in obtaining a pass");
} catch (InterruptedException e) {
}finally {
// Release (2) will return 2 permitts
semaphore.release(2); }}); tB.start();// Make sure thread B executes
TimeUnit.MILLISECONDS.sleep(200);
Thread tC = new Thread(() ->{
try {
// Every time acquire() will consume a license.
semaphore.acquire();
System.out.println("Thread C succeeded in obtaining a pass");
} catch (InterruptedException e) {
}finally {
// Each release() call returns a permittingsemaphore.release(); }}); tC.start(); }}Copy the code
Execution Result:
Thread A succeeded in obtaining the pass thread B succeeded in obtaining the pass thread C succeeded in obtaining the passCopy the code
\
3. Source code analysis
The inner class Sync
-
Through several implementations of Sync, we get the following information:
- Permissions are passed in when the method is constructed;
- Permissions are stored in the state variable state;
- When attempting to obtain a license, the value of state is reduced by 1;
- When the value of state is 0, the license cannot be obtained.
- When a license is released, the value of state increases by 1;
- The number of licenses can change dynamically;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// constructor, pass in the number of permissions, put in state
Sync(int permits) {
setState(permits);
}
// Number of licenses obtained
final int getPermits(a) {
return getState();
}
// Unfair mode tries to get permission
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// Let's see how many permissions are left
int available = getState();
// Subtracting the permissions that need to be obtained this time leaves several permissions
int remaining = available - acquires;
// If the remaining license is less than 0, return it directly
// If the remaining permissions are not less than 0, try atomic updating the value of state and return the remaining permissions on success
if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}// Release permission
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// Let's see how many permissions are left
int current = getState();
// Add permission for this release
int next = current + releases;
// Detect overflow
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// If the atom successfully updated the value of state, the release permission was successful, and true is returned
if (compareAndSetState(current, next))
return true; }}// Reduce permissions
final void reducePermits(int reductions) {
for (;;) {
// Let's see how many permissions are left
int current = getState();
// Subtract the permissions to be reduced
int next = current - reductions;
// Detect overflow
if (next > current) // underflow
throw new Error("Permit count underflow");
// Atom updates state, returns true on success
if (compareAndSetState(current, next))
return; }}// Destroy the license
final int drainPermits(a) {
for (;;) {
// Let's see how many permissions are left
int current = getState();
// If 0, return directly
// If not, update the state atom to 0
if (current == 0 || compareAndSetState(current, 0))
returncurrent; }}}Copy the code
\
The inner class NonfairSync
In unfair mode, call nonfairTryAcquireShared() directly from the parent class to try to get permission.
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// constructor, which calls the parent constructor
NonfairSync(int permits) {
super(permits);
}
// Try to get permission by calling the parent's nonfairTryAcquireShared() method
protected int tryAcquireShared(int acquires) {
returnnonfairTryAcquireShared(acquires); }}Copy the code
\
The inner class FairSync
In fair mode, check whether there is a queue in front first. If there is a queue, the license fails to be obtained and the queue is entered; otherwise, atomic update of the state value is attempted.
** Note: ** For ease of reading, this inner class pastes some methods from AQS, with a note in the header comment!
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/** * This method is located in AQS: * Attempts to obtain a pass, returns a value >= 0 on success; * Return < 0 value */ on fetch failure
protected int tryAcquireShared(int acquires) {
for (;;) {
// Check whether there is a queue waiting in the current AQS block queue. If there is a queue waiting in the current AQS block queue, return -1, indicating that the current aquire thread needs to enter the queue waiting..
if (hasQueuedPredecessors())
return -1;
// What are some of the situations when I get to this point?
// 1. AQS blocks queue when aquire is called
// 2. The current node is a headNext node in the blocking queue
// Get state, which means pass
int available = getState();
// Remaining Remaining indicates the number of semaphore remaining after the current thread obtains the pass
int remaining = available - acquires;
// Remaining < 0 is valid, which indicates that the thread fails to obtain the pass..
// Remaning >= 0, CAS update state is successful, indicating that the thread obtained the pass successfully, CAS failed, then spin.
if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}/** * This method is located in AQS: */
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
If (acquire) the thread that called acquire is already in the interrupt state.
if (Thread.interrupted())
throw new InterruptedException();
// The threads executing tasks at the business level have broken latch. Any other thread that calls latch.await will not block here
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/** * This method is located in AQS: */
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Wrap the thread calling semaphore. aquire as a node and add it to the AQS blocking queue.
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// Get the precursor node of the current thread node
final Node p = node.predecessor();
// If the condition is true, the node corresponding to the current thread is head.next
if (p == head) {
// head. Next has the right to acquire the shared lock.
int r = tryAcquireShared(arg);
// From Semaphore's perspective: r represents the number of passes left
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return; }}/ / shouldParkAfterFailedAcquire will find a good father for the current thread, eventually to father node set the status to signal (1), returns true
// parkAndCheckInterrupt suspends the thread of the current node...
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}finally {
if(failed) cancelAcquire(node); }}/** * This method is in AQS: * Set the current node to head and propagate backwards! (Wake up in turn!) * /
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// Set the current node to the new head node.
setHead(node);
Propagate == 1 when propagate == 1
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// Get the successor node of the current node..
Node s = node.next;
// when s == null The current node is tail. This will be true later. DoReleaseShared () handles this case..
// If (s! = null, the mode of s node must be shared mode. latch.await() -> addWaiter(Node.SHARED)
if (s == null || s.isShared())
// In almost all cases the doReleasseShared() method is executed.doReleaseShared(); }}//AQS. ReleaseShared This method is located in AQS:
public final boolean releaseShared(int arg) {
If the thread succeeds in releasing the resource, the thread that failed to acquire the resource will wake up.
if (tryReleaseShared(arg)) {
// Wake up the thread that failed to acquire the resource...
doReleaseShared();
return true;
}
return false;
}
/** ** Wake up the thread that failed to acquire the resource ** CountDownLatch version * Which paths call doReleaseShared? * 1. Latch.countdown () -> aqs.state == 0 -> doReleaseShared() wakes up the thread corresponding to head.next in the current blocking queue. * 2. The awakened thread - > doAcquireSharedInterruptibly parkAndCheckInterrupt () wake up - > setHeadAndPropagate () - > doReleaseShared () * * Semaphore version * which paths call the doReleaseShared method? * * /
//AQS. DoReleaseShared This method is in AQS:
private void doReleaseShared(a) {
for (;;) {
// Get the current AQS header
Node h = head;
// h! = null; the blocking queue is not empty..
// not true: h == null
// After the latch is created, a thread calls the latch.countdown () operation and triggers the logic to awaken the blocking node before any thread calls the await() method..
// h! If = tail is true, there are other nodes in the blocking queue besides the head node.
// h == tail -> Head and tail refer to the same node object. When does that happen?
// 1. In the normal wake up case, the shared lock is acquired in sequence by the current thread (this thread is the tail node).
// 2. The first thread to call await() and the thread to call countDown() and trigger to wake up the blocking node are concurrent..
// Because the await() thread is the first one to call latch.await() and there is nothing in the queue, it needs to replenish and create a Head node and then spin again to join the queue
// Until the await() thread is enqueued, assume that the current queue has only the empty element head that was just created.
CountDown (); countDown(); countDown(); countDown(); countDown();
/ / note: calls to await () thread Because the team completely finished, once again return to the upper method doAcquireSharedInterruptibly will enter into the spin,
// Get the precursor of the current element, determine that it is head.next, so the thread will set itself as head again, and then return from await()...
if(h ! =null&& h ! = tail) {// If the current head must have a successor node!
int ws = h.waitStatus;
// The current head state is signal, indicating that the subsequent node has not been awakened...
if (ws == Node.SIGNAL) {
// Change the state of the head node to 0 before waking up the successor node
// Why is CAS used here? Back to say...
// When the doReleaseShared method has multiple threads to wake up the head.next logic,
// CAS may fail...
/ / case:
If (h == head) returns false, t3 will continue spinning. Participate in the logic of waking up the next head.next..
// the CAS WaitStatus(h, node.signal, 0) is displayed successfully. T4 is also in if (ws == node. SIGNAL) before t3 is changed.
// The CAS WaitStatus(h, node.signal, 0) will fail because t3 is changed...
if(! compareAndSetWaitStatus(h, Node.SIGNAL,0))
continue; // loop to recheck cases
// Wake up the successor node
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// Conditional:
The thread in the setHeadAndPropagate method has not been executed to set the current thread as head.
// At this point, the current thread directly jumps out... The end..
// Do not worry if the wake up logic breaks here? ,
// There is no need to worry, because the awakened thread will execute the doReleaseShared method sooner or later.
// 2. H == null latch is created without any thread calling await() method,
// a thread called latch.countdown () and triggered the logic to wake up the blocking node..
// 3.h == tail -> Head and tail point to the same node object
// The condition does not exist:
// The awakened node is very active and sets itself up as the new head. If it wakes up its node (precursor), the h == head condition will not be true..
// The head precursor does not break out of the doReleaseShared method and continues to wake up the successor of the new head.
if (h == head) // loop if head changed
break; }}}Copy the code
\
A constructor
Semaphore creation requires the number of permissions passed in. Semaphore is also unfair by default, but you can declare it fair by calling the second constructor.
// constructor, which is created by passing in the number of permissions, using unfair mode by default
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// construct method, need to pass in the number of permissions, and whether fair mode
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code
\
Acquire () method
To obtain a license, the default mode is interruptible. If the attempt to obtain a license fails, it will enter the queue of AQS.
public void acquire(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// Obtain a license, non-interrupt mode, if the attempt to obtain a license fails, will be queued in the AQS queue.
public void acquireUninterruptibly(a) {
sync.acquireShared(1);
}
Copy the code
Acquire (int permits) method
Obtain more than one license at a time, can interrupt mode.
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// Obtain multiple permissions at once, in non-interrupt mode.
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
Copy the code
TryAcquire () method
Try to obtain a license, use Sync’s unfair mode to try to obtain a license method, regardless of whether the license was obtained or not, only try once, will not be queued.
public boolean tryAcquire(a) {
return sync.nonfairTryAcquireShared(1) > =0;
}
If no permission has been obtained during this time, return false; otherwise return true;
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
Copy the code
Release () method
Release a license, which increases the value of state by one and wakes up the next thread waiting to acquire the license.
public void release(a) {
sync.releaseShared(1);
}
Copy the code
Release (int permits) method
The value of state increases the number of permitts when multiple permitts are released at one time.
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
Copy the code
\
4, summary
- Semaphore, also known as a Semaphore, is commonly used to control access to shared resources at the same time.
- Semaphore’s internal implementation is based on AQS shared locks;
- Semaphore initialization needs to specify the number of permissions. The number of permissions is stored in state.
- When obtaining a license, the state value is reduced by 1;
- When a license is released, the state value increases by 1;