In microservice system, cache, current limiting and fuse breaker are the three axes to ensure high availability of the system. Today we will talk about current limiting.

Traffic limiting is one of the ways to ensure the high availability of the system. Of course, it is also a high frequency interview question. If the interviewer of Ali asks: “How to achieve traffic limiting of 1K requests per second?” If you write several schemes for limiting the current to him every minute, it will not be sweet. Without further discussion, let me list some common implementations of limiting traffic.

Guava RateLimiter

Guava is an excellent open source project in the Java world. It contains collections, strings, caches, etc. RateLimiter is a common stream limiting tool.

RateLimiter is implemented based on the token bucket algorithm. If there are 10 tokens per second, the internal implementation will produce one token every 100ms.

The use of Guava RateLimiter
  1. Introducing POM dependencies:

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>23.0</version>
    </dependency>
    Copy the code
  2. Code:

    public class GuavaRateLimiterTest {
        // For example, 10 tokens per second is equivalent to 1 token per 100ms
        private RateLimiter rateLimiter = RateLimiter.create(10);
    
        /** * simulate the execution of the business method */
        public void exeBiz(a) {
            if (rateLimiter.tryAcquire(1)) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread" + Thread.currentThread().getName() + ": Execute business logic");
            } else {
                System.out.println("Thread" + Thread.currentThread().getName() + ": Restricted flow"); }}public static void main(String[] args) throws InterruptedException {
            GuavaRateLimiterTest limiterTest = new GuavaRateLimiterTest();
            Thread.sleep(500);// Wait 500ms for Limiter to produce some tokens
    
            // Simulate instant production of 100 thread requests
            for (int i = 0; i < 100; i++) {
                newThread(limiterTest::exeBiz).start(); }}}Copy the code

Sliding window count

For example, if an interface allows 100 requests per second, set up a slide window with 10 cells, each occupying 100ms, moving every 100ms. The more grids the sliding window is divided, the smoother the rolling of the sliding window will be, and the more accurate the statistics of current limiting will be.

Code:

/** ** slide counter */
public class SliderWindowRateLimiter implements Runnable {
    // Maximum number of accesses per second
    private final long maxVisitPerSecond;
    // Divide time into N blocks per second
    private final int block;
    // The number of blocks stored per block
    private final AtomicLong[] countPerBlock;
    // Slide window to which block, can be understood as the slide window starting subscript position
    private volatile int index;
    // Total current quantity
    private AtomicLong allCount;

    /** * constructor **@paramBlock, N Windows per second *@paramMaxVisitPerSecond Maximum number of visits per second */
    public SliderWindowRateLimiter(int block, long maxVisitPerSecond) {
        this.block = block;
        this.maxVisitPerSecond = maxVisitPerSecond;
        countPerBlock = new AtomicLong[block];
        for (int i = 0; i < block; i++) {
            countPerBlock[i] = new AtomicLong();
        }
        allCount = new AtomicLong(0);
    }

    /** * Checks whether the maximum number of allowed ** is exceeded@return* /
    public boolean isOverLimit(a) {
        return currentQPS() > maxVisitPerSecond;
    }

    /** * Gets the total number of current accesses **@return* /
    public long currentQPS(a) {
        return allCount.get();
    }

    /** * request access to determine whether the business logic can be executed */
    public void visit(a) {
        countPerBlock[index].incrementAndGet();
        allCount.incrementAndGet();

        if (isOverLimit()) {
            System.out.println(Thread.currentThread().getName() + "Restricted flow" + ", currentQPS: + currentQPS() + ", index:" + index);
        } else {
            System.out.println(Thread.currentThread().getName() + "Execute business logic" + ", currentQPS: + currentQPS() + ", index:"+ index); }}/** ** timed actuator, * every N ms the slider moves, and then sets the initial number of the new slider 0, and then the new request will fall on the new slider */ simultaneously subtracts the number of the new slider and resets the number of the new slider */
    @Override
    public void run(a) {
        index = (index + 1) % block;
        long val = countPerBlock[index].getAndSet(0);
        allCount.addAndGet(-val);
    }

    public static void main(String[] args) {
        SliderWindowRateLimiter sliderWindowRateLimiter = new SliderWindowRateLimiter(10.100);

        // Fixed speed to move slider
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter, 100.100, TimeUnit.MILLISECONDS);

        // Simulate requests at different speeds
        new Thread(() -> {
            while (true) {
                sliderWindowRateLimiter.visit();
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // Simulate requests at different speeds
        new Thread(() -> {
            while (true) {
                sliderWindowRateLimiter.visit();
                try {
                    Thread.sleep(50);
                } catch(InterruptedException e) { e.printStackTrace(); } } }).start(); }}Copy the code

A semaphore

Using Semaphore, release Semaphore resources at fixed intervals. The thread acquires the resource and executes the business code.

Code:

public class SemaphoreOne {
    private static Semaphore semaphore = new Semaphore(10);

    public static void bizMethod(a) throws InterruptedException {
        if(! semaphore.tryAcquire()) { System.out.println(Thread.currentThread().getName() +"Rejected");
            return;
        }

        System.out.println(Thread.currentThread().getName() + "Execute business logic");
        Thread.sleep(500);// The simulation processing business logic takes 1 second
        semaphore.release();
    }

    public static void main(String[] args) {

        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run(a) {
                semaphore.release(10);
                System.out.println("Release all locks"); }},1000.1000);

        for (int i = 0; i < 10000; i++) {
            try {
                Thread.sleep(10);// Simulate one request coming in every 10ms
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                try {
                    SemaphoreOne.bizMethod();
                } catch(InterruptedException e) { e.printStackTrace(); } }).start(); }}}Copy the code

The token bucket

Token bucket algorithm: a bucket with a fixed capacity of tokens. Tokens are added to the bucket at a fixed rate. If there is any remaining capacity, tokens are added, and if there is no remaining capacity, tokens are discarded. If a request comes in, the token needs to be fetched from the bucket first, and when no token is available in the bucket, the task is rejected.

The advantage of a token bucket is that you can vary the rate at which tokens are added, and once you increase the rate, you can handle burst traffic.

Code:

public class TokenBucket {
    /** * define bucket */
    public class Bucket {
        / / capacity
        int capacity;
        // Rate, how much per second
        int rateCount;
        // Current number of tokens
        AtomicInteger curCount = new AtomicInteger(0);

        public Bucket(int capacity, int rateCount) {
            this.capacity = capacity;
            this.rateCount = rateCount;
        }

        public void put(a) {
            if (curCount.get() < capacity) {
                System.out.println("Current quantity ==" + curCount.get() + "I can keep playing."); curCount.addAndGet(rateCount); }}public boolean get(a) {
            if (curCount.get() >= 1) {
                curCount.decrementAndGet();
                return true;
            }
            return false; }}@Test
    public void testTokenBucket(a) throws InterruptedException {

        Bucket bucket = new Bucket(5.2);

        // A fixed thread, a fixed rate of data into the bucket, such as N per second
        ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(1);
        scheduledCheck.scheduleAtFixedRate(() -> {
            bucket.put();
        }, 0.1, TimeUnit.SECONDS);

        // Wait for a moment, let the bucket put some tokens
        Thread.sleep(6000);

        // Simulate instant 10 threads to get token
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                if (bucket.get()) {
                    System.out.println(Thread.currentThread() + "Resources have been acquired.");
                } else {
                    System.out.println(Thread.currentThread() + "Rejected");
                }
            }).start();
        }

        // Wait, put the token in the bucket
        Thread.sleep(3000);

        // Continue instant 10 threads come in to get token
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                if (bucket.get()) {
                    System.out.println(Thread.currentThread() + "Resources have been acquired.");
                } else {
                    System.out.println(Thread.currentThread() + "Rejected"); } }).start(); }}}Copy the code

conclusion

This article mainly introduces several flow limiting methods: Guava RateLimiter, simple counting, sliding window counting, semaphore, token bucket, of course, flow limiting algorithm and leak bucket algorithm, NGINx flow limiting and so on. These methods written by half a cigarette are only the ones that individuals have always used in actual projects, or have written in ali’s written exams.

If you have better ideas, welcome to communicate with me!

The original link