Introduction to the

Semaphore is a synchronization aid that translates to Semaphore for flow control, which controls the number of times a resource is accessed at the same time.

Synchroniezd and ReentrantLock allow only one thread to access a resource at a time, but Semaphore allows multiple threads to access a resource at the same time.

Semaphore has a constructor that passes in an integer of type n to indicate that a piece of code can be accessed by at most n threads. If more than n is passed, wait until one thread finishes executing the block before the next thread enters.

Two operations are defined on the semaphore:

  • Acquire: When a thread calls the Acquire operation, it either successfully acquires a Semaphore (Semaphore minus 1) or waits until a thread releases a Semaphore or times out. Semaphore maintains a wait queue to store the suspended threads.
  • Release actually increases the value of the semaphore by 1 and wakes up an arbitrary waiting thread in the wait queue for the corresponding Sepmaphore instance.

Application scenarios

Semaphores are primarily used for two purposes:

  • Mutually exclusive use of multiple shared resources.
  • Control of the number of concurrent threads.

example

Here is an example :5 threads grab 3 parking Spaces. At the same time, only 3 threads can grab a parking space. After other threads release the semaphore, they can grab a parking space.

	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3);

		for (int i = 0; i < 5; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() { try { semaphore.acquire(); System.out.println(thread.currentThread ().getName()+"Grab a parking space."); ThreadUtil. Sleep (RandomUtil. RandomInt (1000500)); System.out.println(Thread.currentThread().getName()+"Return of parking space."); } catch (InterruptedException e) { e.printStackTrace(); }finally {// Semaphore.release (); }}},"Thread"+i).start(); }}Copy the code

Matters needing attention

  • Semaphore.acquire() and semaphore.release () are always paired; this is guaranteed by the application code itself.
  • The semaphore.release () call should be placed ina finally block so that the Semaphore acquired by the current thread cannot be returned in the event of an exception in the application code.
  • If permits is set to 1 in the Semaphore constructor, the Semaphore created is equivalent to a mutex. Unlike other mutex locks, this mutex allows one thread to release a lock held by another thread. Because a thread can execute semaphore.acquire () without ever executing semaphore.release ().
  • By default,Semaphore uses an unfair scheduling policy.

The principle of

The abstract static class Sync extends AbstractQueuedSynchronizer {/ / omitted}Copy the code

Semaphore internal use Sync class, Sync is inheriting AbstractQueuedSynchronizer, so the Sync the bottom or using AQS. Sync has two implementation classes, NonfairSync and FairSync, that specify whether a fair policy should be used when obtaining a semaphore.

Initialization method

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
    setState(permits);
}
Copy the code

As shown above,Semaphore uses an unfair policy by default. If a fair policy is required, a Semaphore object can be constructed using a constructor that takes two arguments.

The permitting parameter is passed to the state value of AQS, which indicates the number of semaphores currently held.

Void acquire () method

The current thread calls this method with the intention of obtaining a semaphore resource.

If the number of current semaphores is greater than 0, the count of the current semaphore is reduced by 1, and the method returns directly. Otherwise, if the current semaphore number is equal to 0, the current thread will be put into the blocking queue of AQS. When another thread interrupts the current thread by calling its interrupt() method, the current thread returns InterruptedException.

//Semaphore method public void acquire() throws InterruptedException {// If the pass parameter is 1, the Semaphore resource is to be obtained sync.acquireSharedInterruptibly(1); } / / the method of AQS public final void acquireSharedInterruptibly (int arg) throws InterruptedException {/ / (1) if the thread is interrupted, an interruptif(Thread.interrupted()) throw new InterruptedException(); //(2) Otherwise call Sync subclass method to try to get, here according to the constructor to determine the use of fair policyif(tryAcquireShared(arg) < 0) Then try again, calling the park method if used to suspend the current threaddoAcquireSharedInterruptibly(arg);
}
Copy the code

The above code shows that acquire () internally call the Sync acquireSharedlnterruptibly method, the latter to interrupt respond (if the current thread is interrupted, an interrupt). The tryAcquireShared method for trying to get AQS of semaphore resources is implemented by a subclass of Sync, so this is discussed from both sides.

The tryAcquireShared method of the NonfairSync class is shown below:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for(;;) Int available = getState(); Int remaining = available - acquires; // If the remaining value is less than 0 or the CAS configuration is successful, this field is returnedif (remaining < 0 ||
            compareAndSetState(available, remaining))
            returnremaining; }}Copy the code

The code above obtains the value of available and then subtracts the value of acquires to obtain remaining semaphores. If the remaining value is less than 0, the number of remaining semaphores does not meet the requirement. Return a negative number, and the current thread will be placed in the AQS blocking queue and suspended. If the residual value is greater than 0, the CAS operation sets the current semaphore value to the residual value, and then returns the residual value.

In addition, since NonFairSync is not fairly acquired, the thread that calls aquire first does not necessarily get the semaphore before the next person.

Consider the following scenario: if thread A first calls aquire() for the semaphore, but the current number of semaphore is 0, thread A will be put into the blocking queue of AQS. Some time later, thread C calls the release() method to release A semaphore. If no other thread is currently retrieving the semaphore, thread A will be activated and acquire the semaphore. However, if thread C calls aquire after releasing the semaphore, Thread C will then compete with thread A for the semaphore resource. If an unfair policy is used, thread C can obtain the semaphore before or after thread A is activated, as shown in the nonfairTryAcquireShared code. In this mode, the blocking thread is competing with the current requesting thread, rather than following A first-come, first-served policy.

Here’s how the fair FairSync class ensures fairness.

protected int tryAcquireShared(int acquires) {
    for(;;) {// Check whether the precursor node of the current thread node is also waiting for the resource, if so, return directlyif (hasQueuedPredecessors())
            return- 1; int available = getState(); int remaining = available - acquires;if (remaining < 0 ||
            compareAndSetState(available, remaining))
            returnremaining; }}Copy the code

So fairness is also guaranteed by the hasqueuedboomfunction. So Semaphore’s fair strategy is to see if the precursor node of the current thread node is also waiting for the resource, if so, give up the access itself, and then the current thread will be put on the AQS blocking queue, otherwise it will get the resource.

Void acquire(int permits) method

This method is different from acquire() method, the latter only needs to obtain a signal value, while the former obtains an permits.

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
Copy the code

Void acquireUninterruptibly () method

This method is similar to acquire() except that it does not respond to interrupts. That is, when the current thread acquireUninterruptibly calls the resource (including if it is blocked), other threads call interrupt() to set the current thread’s interrupt flag. The current thread does not return with an IllegalArgumentException.

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
Copy the code

Void release () method

This method increases the Semaphore object’s Semaphore value by 1. If a thread is currently in the AQS blocking queue because aquire is blocked, a thread whose Semaphore number can be met will be activated according to the fair policy, and the activated thread will try to acquire the newly increased Semaphore.

public void release() { //(1)arg=1 sync.releaseShared(1); } public final Boolean releaseShared(int arg) {//(2) Try to free the resourceif(tryReleaseShared(arg)) {//(3) Call the park method to wake up the first suspended thread in the AQS queuedoReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for(;;) Int current = getState(); // Increase releases by 1 int next = current + releases; // Remove processingif (next < current) // overflow
            throw new Error("Maximum permit count exceeded"); // Use CAS to ensure atomicity of update signal quantifiersif (compareAndSetState(current, next))
            return true; }}Copy the code

Release ()-> sync.releaseshared (1); release()-> sync.releaseshared (1); CAS is used to ensure atomicity of the release method incrementing the semaphore by 1. After the tryReleaseShared method successfully increments the semaphore value, code (3) is executed by calling the AQS method to activate the thread blocked by calling the Acquire method.

Void release(int permits) method

The difference between this method and the release method without parameters is that the former will add permits on the basis of the original signal magnitude value each time, while the latter will add L each time.

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
Copy the code

As you can also see, sync.releaseshared is a shared method, which means that the semaphore is shared by threads. The semaphore is not tied to a fixed thread, and multiple threads can use CAS simultaneously to update the value of the semaphore without blocking.