JAVA implementation of a simple stream limiter

What is a current restrictor

In high concurrency scenarios, traffic is limited to protect the system.

The semaphore implements the current limiter

When it comes to the implementation of a flow limiter, it is easy to imagine that a semaphore is a similar principle, which allows a certain number of threads to access the critical region. The specific implementation code is shown below. Only two threads are allowed to access the critical region at the same time, and the other threads wait for the purpose of limiting the flow.

Semaphore Semaphore = new Semaphore(2); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i <20 ; i++) { int finalI = i; executorService.submit(() -> { try { semaphore.acquire(); System.out.println(thread.currentThread ().getName() + "=== "== "+ finalI); TimeUnit.SECONDS.sleep(3); System.out.println(thread.currentThread ().getName() + "=== end ===" + finalI); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); }}); }Copy the code

However, semaphore implementation also needs mutex lock unlock operation, can be further optimized?

High performance current limiter Guava RateLimiter

Introduction to use

Guava is Google’s open source utility class library. It provides RateLimiter as a stream limiting utility class. Before use, it needs to introduce its dependencies as follows.

< the dependency > < groupId > com. Google. Guava < / groupId > < artifactId > guava < / artifactId > < version > 31.1 jre < / version > </dependency>Copy the code

Simple use is as follows

RateLimiter limiter = RateLimiter. Create (2.0); / / the task thread pool ExecutorService es = Executors. NewFixedThreadPool (1); AtomicLong prev = new AtomicLong(system.nanotime ()); For (int I = 0; i < 20; I++) {// limiter. Acquire (); Es.execute (() -> {long cur = system.nanotime (); Println ((cur-prev.get ()) / 1000_000); system.out.println ((cur-prev.get ()) / 1000_000); prev.set(cur); }); }Copy the code

The request result is as follows, it is obvious that most requests are maintained at about 500 ms, and the first conclusion can be drawn. Another embodiment of flow limiting should be the average speed. For example, if the flow limiting device defines the flow rate as 2, then there are two requests per second, and 500 ms is one request, which meets the request result.

Token bucket algorithm

Guava is relatively simple to use, so what is the principle of Guava current limiting? In fact, we are familiar with the token bucket algorithm, which puts tokens into the bucket at a certain rate, and the thread that wants to pass through the flow limiter needs to get the token in the token bucket before it is allowed to pass. That is to say, we can control the flow rate as long as we limit the speed of the token to achieve the purpose of flow limiting. Details are as follows:

  • Tokens are put into the token bucket at a constant rate, and if the rate of limiting is r/ second, one token is put into the bucket every 1/r second.
  • Given the size of the token bucket is B, the token will no longer be placed after b.
  • The request can pass through the limiter only if there is a token in the token bucket.

The producer-consumer model implements the current limiter

We can adopt the producer-consumer pattern, in which the producer adds tokens to the task queue at a certain frequency, and the thread trying to pass the flow limiter as a consumer can only pass the flow limiter by taking the token out of the task queue. Based on this principle, the demo code is shown below.

Public class CustomRateLimiter {// Task queue, Private static BlockingQueue BlockingQueue = new ArrayBlockingQueue(3); static { try { blockingQueue.put(new Object()); blockingQueue.put(new Object()); blockingQueue.put(new Object()); } catch (InterruptedException e) { e.printStackTrace(); }} / producer/public void product () {ScheduledExecutorService ses = Executors. NewScheduledThreadPool (3); // scheduleAtFixedRate Indicates the time when a task starts at a fixed frequency. After a period, the task is executed immediately. If the previous task is executed successfully, the current task is executed immediately. // scheduleWithFixedDelay specifies a fixed delay. Delay (latency) refers to an end and the next begins the delay between ses. ScheduleWithFixedDelay (() - > {try {blockingqueue.put (new Object ()); } catch (InterruptedException e) { e.printStackTrace(); }}, 4, 4, TimeUnit. SECONDS); } public void acquire(){try {// block blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

The test code is as follows

class Test3{ public static void main(String[] args) { CustomRateLimiter customRateLimiter = new CustomRateLimiter(); / / call the producer customRateLimiter. Product (); ExecutorService es = Executors.newFixedThreadPool(10); AtomicLong prev = new AtomicLong(system.nanotime ()); For (int I = 0; int I = 0; i <20 ; i++) { random(); customRateLimiter.acquire(); es.execute(()->{ long cur = System.nanoTime(); Println (thread.currentThread ().getName()+"==="+(cur-prev.get ()) / 1000_000); system.out.println (thread.currentThread ().getName()+"==="+(cur-prev.get ()) / 1000_000); prev.set(cur); }); } } public static void random(){ Random random = new Random(); int i = random.nextInt(5); Thread.out.println (thread.currentThread ().getName()+"== "+ I); TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); }}}Copy the code

Access to the random method and initialized logical thread pool in static are kept under two seconds, as shown below

When you add the random method and the initialization logic in static, the effect of a current limiter is obvious, as shown below

Producer consumer implementations are perfectly written, so why didn’t Guava adopt this implementation? The reason is simple: under high concurrency, the CPU of the machine is basically busy. In this case, scheduled tasks may fail to occupy the CPU, which may lead to the delay of scheduled tasks.

So how did Guava handle it? I’ll talk to you next time because of space.

\