The problem
(1) What is Semaphore?
(2) What are the features of Semaphore?
(3) In what situations is Semaphore usually used?
(4) Can the number of Semaphore licenses be dynamically increased or decreased?
(5) How does Semaphore implement traffic limiting?
Introduction to the
Semaphore, which holds a series of permitseach acquire() call will consume one and each release() call will return one.
features
Semaphore is commonly used to limit the number of accesses to a shared resource at the same time, also known as flow limiting.
Let’s learn how Semaphore is implemented in Java.
Class structure
Semaphore includes Sync, a synchronizer that implements AQS, and its two subclasses FairSync and NonFairSync, indicating that Semaphore also distinguishes between fair and unfair modes.
Source code analysis
This article is relatively simple based on the previous analysis of ReentrantLock and ReentrantReadWriteLock. Some of the methods discussed above will be skipped. If you are interested, you can scroll to the bottom of the article to see the previous article.
The inner class Sync
// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// constructor, pass in the number of permissions, put in state
Sync(int permits) {
setState(permits);
}
// Number of licenses obtained
final int getPermits(a) {
return getState();
}
// Unfair mode tries to get permission
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// See how many permissions are left
int available = getState();
// Subtracting the permissions that need to be obtained this time leaves several permissions
int remaining = available - acquires;
// If the remaining license is less than 0, return it directly
// If the remaining permissions are not less than 0, try atomic updating the value of state and return the remaining permissions on success
if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}// Release permission
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// See how many permissions are left
int current = getState();
// Add permission for this release
int next = current + releases;
// Detect overflow
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// If the atom successfully updated the value of state, the release permission was successful, and true is returned
if (compareAndSetState(current, next))
return true; }}// Reduce permissions
final void reducePermits(int reductions) {
for (;;) {
// See how many permissions are left
int current = getState();
// Subtract the permissions to be reduced
int next = current - reductions;
// check the list
if (next > current) // underflow
throw new Error("Permit count underflow");
// Atom updates state, returns true on success
if (compareAndSetState(current, next))
return; }}// Destroy the license
final int drainPermits(a) {
for (;;) {
// See how many permissions are left
int current = getState();
// If 0, return directly
// If not, update the state atom to 0
if (current == 0 || compareAndSetState(current, 0))
returncurrent; }}}Copy the code
Through several implementations of Sync, we get the following information:
(1) Permissions are passed in when the method is constructed;
(2) Permission is stored in the state variable state;
(3) When attempting to obtain a license, the value of state is reduced by 1;
(4) When the value of state is 0, the license cannot be obtained;
(5) When a license is released, the value of state increases by 1;
(6) The number of licenses can be dynamically changed;
The inner class NonfairSync
// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// constructor, which calls the parent constructor
NonfairSync(int permits) {
super(permits);
}
// Try to get permission by calling the parent's nonfairTryAcquireShared() method
protected int tryAcquireShared(int acquires) {
returnnonfairTryAcquireShared(acquires); }}Copy the code
In unfair mode, call nonfairTryAcquireShared() directly from the parent class to try to get permission.
The inner class FairSync
// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// constructor, which calls the parent constructor
FairSync(int permits) {
super(permits);
}
// Try to obtain permission
protected int tryAcquireShared(int acquires) {
for (;;) {
// Fair mode needs to check if there is a queue in front of it
// If there is a queue, return failure
if (hasQueuedPredecessors())
return -1;
// No queued attempt is made to update the value of state
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
returnremaining; }}}Copy the code
In fair mode, check whether there is a queue in front first. If there is a queue, the license fails to be obtained and the queue is entered; otherwise, atomic update of the state value is attempted.
A constructor
// constructor, which is created by passing in the number of permissions, using unfair mode by default
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// construct method, need to pass in the number of permissions, and whether fair mode
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Copy the code
Semaphore creation requires the number of permissions passed in.
Semaphore is also unfair by default, but you can declare it fair by calling the second constructor.
The following methods are easy to use after learning the previous content. Here are just some of the features Semaphore supports.
The following methods are used to describe unfair patterns.
Acquire () method
public void acquire(a) throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
Copy the code
To obtain a license, the default mode is interruptible. If the attempt to obtain a license fails, it will enter the queue of AQS.
AcquireUninterruptibly () method
public void acquireUninterruptibly(a) {
sync.acquireShared(1);
}
Copy the code
Obtain a license, non-interrupt mode. If an attempt to obtain a license fails, the AQS queue will be queued.
TryAcquire () method
public boolean tryAcquire(a) {
return sync.nonfairTryAcquireShared(1) > =0;
}
Copy the code
Try to obtain a license, use Sync’s unfair mode to try to obtain a license method, regardless of whether the license was obtained or not, only try once, will not be queued.
TryAcquire (long Timeout, TimeUnit Unit) method
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
Copy the code
If it fails to obtain a license, the system will wait for a timeout period. If it does not obtain a license within this period, the system will return false; otherwise, the system will return true.
Release () method
public void release(a) {
sync.releaseShared(1);
}
Copy the code
Release a license, which increases the value of state by one and wakes up the next thread waiting to acquire the license.
Acquire (int permits) method
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
Copy the code
Obtain more than one license at a time, can interrupt mode.
AcquireUninterruptibly (int permits) method
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
Copy the code
Obtain more than one license at a time, in non-interrupt mode.
TryAcquire (int permits) method
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
Copy the code
Try to get more than one license at a time, try only once.
TryAcquire (int Permits, long timeout, TimeUnit Unit) method
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
Copy the code
Attempts to obtain multiple permissions and waits for a timeout, during which time false is returned, otherwise true is returned.
Release (int permits) method
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
Copy the code
The value of state increases the number of permitts when multiple permitts are released at one time.
AvailablePermits () method
public int availablePermits(a) {
return sync.getPermits();
}
Copy the code
Get the number of permissions available.
DrainPermits () method
public int drainPermits(a) {
return sync.drainPermits();
}
Copy the code
Destroying the number of currently available licenses does not affect the number of licenses that have been obtained. The remaining licenses will be destroyed.
ReducePermits (int) reduction method
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
Copy the code
Reduce the number of permits.
conclusion
(1) Semaphore, also known as Semaphore, is commonly used to control access to shared resources at the same time.
(2) Semaphore’s internal implementation is based on AQS shared lock;
(3) Semaphore initialization needs to specify the number of permissions, the number of permissions is stored in state;
(4) When obtaining a license, the state value is reduced by 1;
(5) When a license is released, the state value increases by 1;
(6) N licenses can be dynamically reduced;
(7) Can N licenses be dynamically added?
eggs
(1) How to dynamically add N licenses?
Call Release (int permitting) Semaphore does not check whether the current thread has been granted permission to release the Semaphore lock. So you can add some licenses dynamically by calling the release license method.
(2) How to achieve current limiting?
A: Current limit, that is, at the time of flow increases suddenly, the upper to be able to restrict sudden heavy traffic impact to the downstream service, in the distributed system in current limit in the gateway layer do you usually do, of course, can also be in individual function simply to current limiting, seconds kill scenes, for example, if only 10 goods need to kill, then the service itself can limit only in 100 requests at the same time, All other requests are nullified so that the service is not too stressed.
Using Semaphore, you can limit the flow directly for this function. Here is the code:
public class SemaphoreTest {
public static final Semaphore SEMAPHORE = new Semaphore(100);
public static final AtomicInteger failCount = new AtomicInteger(0);
public static final AtomicInteger successCount = new AtomicInteger(0);
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
newThread(()->seckill()).start(); }}public static boolean seckill(a) {
if(! SEMAPHORE.tryAcquire()) { System.out.println("no permits, count="+failCount.incrementAndGet());
return false;
}
try {
// Process the business logic
Thread.sleep(2000);
System.out.println("seckill success, count="+successCount.incrementAndGet());
} catch (InterruptedException e) {
// Todo handles exceptions
e.printStackTrace();
} finally {
SEMAPHORE.release();
}
return true; }}Copy the code
Recommended reading
The beginning of the Java Synchronization series
2, Unbroadening Java magic class parsing
JMM (Java Memory Model)
Volatile parsing of the Java Synchronization series
Synchronized parsing of Java series
6, Deadknock Java synchronization series write a Lock Lock yourself
7. AQS of The Java Synchronization series
ReentrantLock (a) — fair lock, unfair lock
ReentrantLock – Conditional lock
ReentrantLock VS Synchronized Java series
ReentrantReadWriteLock source code parsing
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.