This is the 28th day of my participation in the August Challenge

  1. Semaphore’s definition, meaning
  2. How was it implemented before juc Semaphore
  3. Semaphore use
  4. Distributed Semaphore implementation

A semaphore

The earliest mechanism used to solve the process synchronization and mutual exclusion problem consists of a variable called a semaphore and two primitive operations (PV operations) performed on it

What is a semaphore?

The data structure of a semaphore is a value and a pointer to the next process waiting for the semaphore. The semaphore value is related to the usage of the corresponding resource.

PV operations consist of P operation primitives and V operation primitives (primitives are uninterruptible processes)

(Note: ‘P’ is’ passen ‘and’ V ‘is’ Verhoog’ for incremnet.)

To operate on a semaphore, as defined below:

  • P (S) :
    • ① Decrease the value of semaphore S by 1, that is, S=S-1;
    • ② If S>=0, the process continues. Otherwise, the process is put into the wait state and queued
  • V (S) :
    • ① Add the value of the semaphore S by 1, that is, S=S+1;
    • ② If S>0, the process continues. Otherwise, the first process in the queue waiting for a semaphore is released

Significance of PV operation: we use semaphores and PV operations to achieve synchronization and mutual exclusion of processes. PV operations are low-level communications of the process

Note the following when implementing process exclusion using the PV operation:

  1. Each program user to achieve mutually exclusive P, V operations must appear in pairs, the first P operation, into the critical area, after the V operation, out of the critical area. If there are multiple branches, check their compatibility carefully
  2. P and V operations should be close to the beginning and end of the critical region, respectively. The code of the critical region should be as short as possible, and there should be no dead loop
  3. The initial value of a mutex is usually 1
Private int permitting = 1; public synchronized void P() { permits--; if(permits < 0 ){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void V(){ permits++; if(permits <=0){ notifyAll(); }}Copy the code

J.U.C Semaphore

One of the utility classes JUC provides is Semaphore, which provides a rich API that you no longer need to implement yourself

// Create Semaphore with a given number of permissions and unfair fair Settings. Semaphore(int permits) // Creates a Semaphore with a given number of permits and a given fair setting. Semaphore(int permits, Boolean fair) // Obtain a permit from this Semaphore, block the thread until a permit is provided, otherwise the thread will be interrupted. Void acquire() // Acquire a given number of permissions from this semaphore and block the thread until those permissions are provided, or the thread has been interrupted. Void acquire(int permits) // Block from this semaphore until permits are available. Void acquireUninterruptibly() // Acquires a given number of permissions from this semaphore and blocks the thread until those permissions are provided. Void acquireUninterruptibly(int permits) // Returns the number of permits currently available in this semapore. Int availablePermits() // Get and return all permits immediately available. Int drainPermits() // Returns a collection containing threads that may be waiting to be fetched. Protected Collection<Thread> getQueuedThreads() // Returns the estimated number of threads waiting to be fetched. Int getQueueLength() // Queries whether there are threads waiting to fetch. Boolean hasQueuedThreads() // Returns true if this semaphore's fair setting is true. Boolean isFair() // Decreases the number of available licenses by the specified reduction. Protected void reducePermits(int Reduction) // Release a permit and return it to the semaphore. Void release() // Releases a given number of permissions, returning them to the semaphore. Void release(int permitting) // Returns a string identifying the permitting and the status of the semaphore. String toString() // Get permission from the semaphore only if there is one available at the time of the call. Boolean tryAcquire() // Get permissions from this semaphore only if there are a given number of permissions in this semaphore at the time of the call. Boolean tryAcquire(int permits) // Obtain a given number of permits from this semaphore if all permits are available to this semaphore within a given waiting time and the current thread is not interrupted. Boolean tryAcquire(int permits, long Timeout, TimeUnit Unit) // Obtain a permit from this semaphore if a permit is available within the given wait time and the current thread has not been interrupted. boolean tryAcquire(long timeout, TimeUnit unit)Copy the code

Semaphore source code for JUC, this article does not elaborate, open a new chapter; But distributed Semaphore can be studied

Distributed Semaphore

There is a corresponding RSemaphore in Redission

RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); / / or semaphore. AcquireAsync (); semaphore.acquire(23); semaphore.tryAcquire();Copy the code

Expired semaphores

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // Get a signal, valid for 2 seconds. String permitId = semaphore.acquire(2, TimeUnit.SECONDS); / /... semaphore.release(permitId);Copy the code

Directly on the most essential source code fragment, Lua script is very simple, count semaphores, acquire, semaphores minus 1, release, semaphores plus 1; The main thing is to keep the operation atomic

@Override public RFuture<Boolean> tryAcquireAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(true); } return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('get', KEYS[1]); " + "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " + "local val = redis.call('decrby', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.<Object>singletonList(getName()), permits); } @Override public RFuture<Void> releaseAsync(int permits) { if (permits < 0) { throw new IllegalArgumentException("Permits amount can't be negative"); } if (permits == 0) { return RedissonPromise.newSucceededFuture(null); } return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + "redis.call('publish', KEYS[2], value); ", Arrays.<Object>asList(getName(), getChannelName()), permits); }Copy the code

On the basis of the most essential, and then take a look at what has been done, can really achieve an industrial production standard

tryAcquire()

It’s non-blocking, it just gets the information, it doesn’t just come back, it’s lua, it doesn’t do anything extra

acquire()

@Override public void acquire(int permits) throws InterruptedException { if (tryAcquire(permits)) { return; } RFuture<RedissonLockEntry> future = subscribe(); commandExecutor.syncSubscription(future); try { while (true) { if (tryAcquire(permits)) { return; } getEntry().getLatch().acquire(permits); } } finally { unsubscribe(future); }}Copy the code

Blocking, a little bit more than non-blocking

  • 1. TryAcquire first and see if you can acquire the semaphore
  • 2. Subscribe to channel events
  • 3. An infinite loop
    • 3.1. TryAcquire () first, try it
    • 3.2. Block through getEntry().getlatch (), i.e. J.U.C. Semaphore,acquire()
  • 4. Unsubscribe

His purpose was to free Semaphore

If A succeeds in obtaining the Semaphore, then B will be blocked by Semaphore. When to exit the block?

After release(), thread A will publish. For details, check the lua script in release() above. When B listens for the event, it will call semaphore.release () and run tryAcquire() again.

tryAcquire(int permits, long waitTime, TimeUnit unit)

A given number of permissions are obtained from this semaphore if all permissions are available for the given waiting time and the current thread is not interrupted

@Override public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); if (tryAcquire(permits)) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> future = subscribe(); if (! await(future, time, TimeUnit.MILLISECONDS)) { return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } while (true) { current = System.currentTimeMillis(); if (tryAcquire(permits)) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } // waiting for message current = System.currentTimeMillis(); getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS); time -= (System.currentTimeMillis() - current); if (time <= 0) { return false; } } } finally { unsubscribe(future); } // return get(tryAcquireAsync(permits, waitTime, unit)); }Copy the code

Await (Future, time, timeunit.milliseconds) is CountDownLatch

Returns true if the count reaches zero; If the wait time is exceeded before the count reaches zero, false is returned

The current is the first request, or some other release, and then it goes down into the loop

Countdownlatch.await ()+ semaphore.tryacquire (

After each wait time, you need to check whether the wait time is exceeded

Why introduce countdownlatch.await ()? Can’t we all use Semaphore.tryacquire ()? This one needs to be dug up again

conclusion

Distributed semaphore, the principle is very clear, mainly through Lua to ensure the atomicity of Redis operation

Read redisson source code, found inside the operation is basically asynchronous, the bottom is based on Netty, a lot of use of future mode, if you do not know the future mode, will be very convoluted, debug will be dizzy, so before going into Redisson, need to review the future mode