Semaphore
Antecedents feed
Before learning this chapter, you need to understand ReentrantLock source code parsing. There are many methods introduced in ReentrantLock source code parsing are the foundation of this chapter. Next, we enter the main topic of this chapter, Semaphore. ReentrantLock source code parsing
Conceptually, Semaphore maintains a set of licenses that restrict thread access to a resource. When we have a resource that allows concurrent access by threads, but we want to limit access, we can use the Semaphore to restrict thread access. When a thread wants to access resources, it first calls acquire method of semaphore to obtain access license. When the thread finishes accessing resources, it calls Release of semaphore to return license. Using semaphore, we can limit the traffic of services. In particular, e-commerce giants like Taobao and Tmall, which usually have a large number of visits, should evaluate the number of visits their services can bear and limit the traffic on Singles’ Day, so as to avoid service downtime due to excessive visits. However, inside the Semaphore object actually does not maintain a set of license, but to preserve one number as license number, if the thread to get the license, will be according to the thread to request the license number of the number of deducting the internal maintenance, if enough to deduct the thread to obtain license is successful, otherwise blocked thread must fall into, Until the number of permits inside the semaphore is sufficient.
Let’s look at the following code. Assuming that OrderService is a remote service, we estimate that the concurrency of the service is 5000, one access to the remote service requires a license, and the execution of methodA() requires only one request for the remote service. So a call to Semaphore.acquire () acquires a license by default. Requests for methodB() need to send two permits to the remote server concurrently, so acquire(int permits) parameter will be passed 2 to ensure that no matter methodA() or methodB() is executed, the concurrency of remote server will not exceed 5000.
When our business no longer has access to the remote service and needs to return the license, methodA() originally requested only one license, here we call release() on the number of licenses inside the semaphore +1. MethodB () permits release(int permitting) If we have 4999 threads executing methodA() at the same time and one thread is executing methodB(), we know that the number of licenses is not enough. The number of licenses for semaphore maintenance is 5000, but the thread needs 5001 licenses to execute methodB() at the same time. So the thread that wants to execute methodB() blocks until the number of licenses inside the semaphore has been deducted enough to get the required number of licenses and access the remote service.
public class OrderService {
private Semaphore semaphore = new Semaphore(5000);
public void methodA(a) {
try {
semaphore.acquire();
//methodA body
} catch(InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); }}public void methodB(a) {
try {
semaphore.acquire(2);
//methodB body
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(2); }}}Copy the code
If it is a semaphore with license 1, it can be treated as a mutex. In this case, the semaphore has only two states: 0 or 1. We set 1 to indicate that the lock is not occupied and 0 to indicate that the lock is occupied. To treat the semaphore as a mutex in this way, we can use one thread to acquire the lock and another thread to release it, as shown below at <1> and <2> respectively. To some extent it can avoid the deadlock, and the traditional Java. Util. Concurrent. The locks, Lock implementation will be a very different, the traditional Lock implementations, such as: already want to solve the Lock of the thread must be the original thread Lock, otherwise it will throw an exception. [Obtaining resources]
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
semaphore.acquire();/ / < 1 >
System.out.println(Thread.currentThread().getName() + "Get exclusive lock");
} catch(InterruptedException e) { e.printStackTrace(); }},Thread 1 "").start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
semaphore.release();/ / < 2 >
System.out.println(Thread.currentThread().getName() + "Release exclusive lock");
}, Thread 2 "").start();
try {
Thread.sleep(100);
} catch(InterruptedException e) { e.printStackTrace(); }}Copy the code
When the semaphore’s number of licenses is zero, if there are still threads requesting licenses, the semaphore will put the thread into a queue and suspend the thread until a license is returned, and the semaphore will try to wake up the thread in the queue that has waited the longest for the license. Thus, semaphores are divided into FairSync and NonfairSync modes. In fair mode, if a thread wants to obtain the license of the semaphore, it will first judge whether there are threads in the waiting queue maintained by the semaphore. If there are threads, it will obediently join the queue. If there are no threads, it will try to request the license. The non-fair mode requests the license directly, regardless of whether there are threads in the queue waiting for the license of the semaphore. [Obtaining resources]
The following code confirms what I said earlier: the semaphore itself does not maintain a collection of license objects. When we pass the number of licenses to the semaphore’s constructor, Eventually, the static internal class Sync will call the setState(permits) method of its parent AQS to assign a permit to the AQS internal field state, which determines how many permits the semaphore has and whether the thread requesting the permit can succeed.
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
/ /...
Sync(int permits) {
setState(permits);
}
/ /...
}
static final class NonfairSync extends Sync {/ / not fair
NonfairSync(int permits) {
super(permits);
}
/ /...
}
static final class FairSync extends Sync {/ / fair
/ /...
FairSync(int permits) {
super(permits);
}
/ /...
}
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/ /...}Copy the code
Judging from the code excerpted above, the use of unfair semaphores is officially recommended because creating a semaphore based on the number of licenses uses unfair semaphores by default, which have higher throughput than fair semaphores. So I introduce unfair semaphores first and then fair semaphores.
Acquire () and Acquire (int permits) methods, we can see that whether we request one license or multiple licenses, we essentially call Sync.
AcquireSharedInterruptibly (int arg) method. If you watch the Sync code of a static inner class can be found: the Sync doesn’t implement acquireSharedInterruptibly (int arg) method, but its parent class AQS implements this method.
public class Semaphore implements java.io.Serializable {
/ /...
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
/ /...
}
public void acquire(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/ /...
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/ /...
}
Copy the code
So we back to the realization of AQS acquireSharedInterruptibly (int arg) method, the realization of this method is not difficult, to judge whether the current thread has interrupted first tag, there is one exception InterruptedException is thrown directly interrupt, Then call tryAcquireShared(int ARg) to try to obtain the license, AQS itself does not implement tryAcquireShared(int ARg) method, but by subclass to achieve, then have subclass to decide to directly try to obtain the license, Is there a thread waiting for a license in the semaphore wait queue? Yes
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/ /...
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/ /...
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/ /...
}
Copy the code
So let’s look at the non-fair lock implementation tryAcquireShared(int arg) method, NonfairTryAcquireShared (int arg) ¶ In the ryAcquireShared(int arg) method of the Sync class, we will call the nonfairTryAcquireShared(int Arg) method of the Sync class. Acquires acquires remaining. If remaining is greater than 0, remaining indicates that the semaphore is allowed to allocate the number of licenses requested by the calling thread. The CAS deduction at <2> is performed. If the deduction succeeds, the remaining number of licenses is returned. If remaining is greater than or equal to 0, the deduction succeeds; if remaining is less than 0, the request fails, indicating that the semaphore does not have enough licenses to call the thread.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/ /...
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||/ / < 1 >
compareAndSetState(available, remaining))/ / < 2 >
returnremaining; }}}/ /...
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
returnnonfairTryAcquireShared(acquires); }}/ /...}Copy the code
If the tryAcquireShared(ARG) attempt to obtain the license fails at <1>, the current thread is suspended by calling the method at <2>. [Obtaining resources]
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)/ / < 1 >
doAcquireSharedInterruptibly(arg);/ / < 2 >}Copy the code
So let’s see if the call tryAcquireShared (arg) request license after the failure of doAcquireSharedInterruptibly (int arg) and perform logic. If you’ve seen the ReentrantLock source code in the previous chapter, you’ll be familiar with this method. It first calls the addWaiter(Node mode) method at <1> to wrap the thread currently requesting the license into a Node. If the nextWaiter of a Node points to the static constant node.shared, it means that the Node is a SHARED Node. In other words, threads of the Node can share resources with other threads of the SHARED Node.
After the thread is queued as a node, it checks whether the node’s precursor node is the head node. If it is the head node, it enters the branch at <2> where tryAcquireShared(arG) is called again to request the license. If tryAcquireShared(ARG) returns a result greater than or equal to 0, the request was successful, otherwise the request failed. If the request fails, then the process we must have clear, will first perform shouldParkAfterFailedAcquire (p, node) to determine whether a precursor node p wait state for SIGNAL (1), if the SIGNAL is directly returns true, The call to parkAndCheckInterrupt() blocks the current thread. If the wait state of the precursor node P is 0, it is CAS changed to SIGNAL and then blocks the current thread in the next loop.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/ /...
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);/ / < 1 >
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {/ / < 2 >
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}catch (Throwable t) {
cancelAcquire(node);
throwt; }}/ /...
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if(oldTail ! = null) { node.setPrevRelaxed(oldTail);if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
returnnode; }}else{ initializeSyncQueue(); }}}/ /...
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
/ /...
static final int PROPAGATE = - 3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared(a) {
return nextWaiter == SHARED;
}
/ /...
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread()); }}/ /...
}
Copy the code
The above flow is the current thread failed to request the license and blocked, so what if the thread entered the wait queue and then obtained the license? That is: After executing the following code at <1> to determine the node corresponding to the thread to join the queue, judge the node’s precursor node is the head node at <2>, enter the branch at <2> and execute the tryAcquireShared(ARG) method at <3> to obtain the license successfully. So what’s going to happen in the sethead propagate (Node propagate, int propagate) method?
A reference to the original head node head is preserved, and the head node is replaced with the current node. If the original return r (propagate) is greater than 0, it means that the semaphore still has the remaining license after the current thread finishes the license request, so the score at <5> must be valid, because propagate is greater than 0, it will judge whether the next node of the current node, next, is in the shared mode. If so, the doReleaseShared() method is called to wake up the next node of the current node. But if propagate is equal to 0 as passed in, there are a few more conditions that you can try to notify the successors of the current node, As long as conditions (h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.w. aitStatus < 0) was established, and the current node is still sharing the next node, you can wake up licensing subsequent nodes. So how to understand the conditions (h = = null | | h.w. aitStatus < 0 | | (h = head) = = null | | h.w. aitStatus < 0)?
First of all, we can ignore two of the four conditions, h == NULL and (h = head) == NULL are impossible to be established. H is the original head node, and the head node cannot be null as long as there is a node joining the team. Secondly, head cannot be null as well, because the head node is already the current node. As far as the author is concerned, these two judgments are standard writing methods to prevent null pointer exception, but to prevent null pointer does not mean null pointer exception. So we can enter the branch at <5> by focusing on either h.waitStatus < 0 or head.waitStatus < 0. What about the conditions h.waitStatus < 0 and head.waitStatus < 0?
Let’s recall the shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) this method, this method takes a precursor Node and the current Node, change the wait state of precursor Node to SIGNAL (1), Node, the next node representing the precursor node pred, is waiting to wake up. So we can see that if head.waitStatus < 0 means that the next node of the current node is waiting to wake up, and if the mode of the next node is shared, the doReleaseShared() method will be called to wake up the next node to try to apply for a license. Even if the incoming signal remaining license number the propagate to 0, because may save from awakening to apply for permits, already have other threads to return permit, such doing can improve overall throughput, even after the next thread to be awakened without can apply for a license number, want to do is to block the thread. Note that if there are n nodes in the queue, the operation of waking up the subsequent nodes does not necessarily spread from the beginning node to the tail node, even though the first n-1 node has a SIGNAL waitStatus. Since it is the tail node, it has no next node to wake up, so its waitStatus is 0. Want to know the current node can wake up the next node condition, first is the precursor node as the head node, the second thread of the current node apply to the license, ability is qualified to try wake up the next node, if the node is awakened, although the precursor is a head node node, but there was no redundant license can apply for, can’t replace the head node, will fall back into jam, There is no attempt to wake up the next node.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/ /...
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);/ / < 1 >
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {/ / < 2 >
int r = tryAcquireShared(arg);/ / < 3 >
if (r >= 0) {/ / < 4 >
setHeadAndPropagate(node, r);
p.next = null; // help GC
return; }}if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw newInterruptedException(); }}catch (Throwable t) {
cancelAcquire(node);
throwt; }}/ /...
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {/ / < 5 >
Node s = node.next;
if(s == null || s.isShared()) doReleaseShared(); }}/ /...
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/ /...
}
Copy the code
It is time to explain that when h.waitStatus < 0, the fact that h.waitStatus < 0 is not necessary can improve throughput.
With a release() or release(int permits), a thread returns a Semaphore with a permit. With a release() or release(int permits), a thread returns a Semaphore with a permit. The doReleaseShared() method is then called to wake up the blocked thread in the semaphore waiting queue to apply for a license. If the head node’s waiting state is SIGNAL, then the successor nodes of the head node are blocked. If the CAS method is used to modify the head node’s waiting state successfully, Call the unparksucceeded (h) to wake up the blocked successor nodes and apply for the license. If a blocked thread wakes up and can apply for a license, it will first replace the head node with the current node and decide whether to call doReleaseShared() to wake up the next successor node based on the condition. If apply for permits failure is performed twice shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) after suspends the current thread.
How does the h.waitStatus < 0 judgment improve throughput? For example, if you have a semaphore license of 2 that has been allocated to thread 1 and thread 2, the number of semaphore licenses is 0. If thread 3 and thread 4 want to request a license, they have to queue first. The corresponding nodes of thread 3 and thread 4 are N3 and N4, and the order of the nodes in the queue is: header->N3->N4. After assuming N3 team, thread 1 will return the license, the N3 Node, it is the precursor of Node head and then apply to the license, so N3 won’t call shouldParkAfterFailedAcquire (Node Mr Pred, Node Node) a shift away from the head nodes wait states. Thread 1, after returning the license, calls doReleaseShared(), assuming that the queue was empty when N3 was enqueued and that the header was initialized by calling initializeSyncQueue(). Therefore, the waiting state of the header is 0, and the CAS mode is used to change the waiting state of the header to PROPAGATE (-3) at <3>. So when thread 3 performs the setHeadAndPropagate(Node Node propagate, int propagate), the head Node points to N3, assuming that thread 4 has joined the queue at this time, but has not changed the waiting state of the front Node N3 to SIGNAL, So (= = null (h = head) | | h.w. aitStatus < 0) is false, but the former head of the node of less than zero wait state, here will enter < 1 > branch, the judge N4 interchange is sharing nodes, called doReleaseShared 4 () wake up the thread.
If Thread 4 is executing and another Thread calls locksupport. unpark(Thread Thread) to wake up Thread 4, Thread 4 will not block the first time it executes locksupport. park(Object Blocker). If parkAndCheckInterrupt() fails, lockSupport.park (Object Blocker) will be blocked again by calling parkAndCheckInterrupt(). This represents one more opportunity for thread 4 to apply for a license. Perhaps the first time thread 4 executed locksupport. park(Object Blocker) without blocking, thread 2 returned the license, and thread 4 applied for the license directly in the next round of the loop.
If thread 4 is blocked and thread 2 returns the license before calling doReleaseShared(), thread 3 goes to the branch at <1> and calls doReleaseShared(), thread 4 is awakened to apply for the license, which is equivalent to two threads competing to wake up thread 4. It can be seen that if the waiting state of the head node is 0, the throughput can be improved by changing its waiting state to PROPAGATE and adding the judgment of the waiting state of the original head node at <1>.
Execution, of course, h.com pareAndSetWaitStatus (0, Node PROPAGATE) existence of failure, such as original for determining head nodes wait states is 0, the execution of code before < 3 > place, modify the precursor head Node of the subsequent Node waiting for SIGNAL, At this time, CAS fails to modify the waiting state of the first node to PROPAGATE, so it will execute a cycle again. At this time, it will enter <2> to wake up the successor node, so the successor node has another chance to apply for the license.
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/ /...
static final class Node {
/ /...
static final int SIGNAL = - 1;
static final int PROPAGATE = - 3;
/ /...
}
/ /...
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||/ / < 1 >
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if(s == null || s.isShared()) doReleaseShared(); }}/ /...
private void doReleaseShared(a) {
for (;;) {
Node h = head;
if(h ! = null && h ! = tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {/ / < 2 >
if(! h.compareAndSetWaitStatus(Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))/ / < 3 >
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break; }}/ /...
}
Copy the code
In the end, I wish you all success as soon as possible, get satisfactory offer, fast promotion and salary increase, and walk on the peak of life.
If you can, please give me a three support me?????? [Obtain information]