I. High performance current limiter Guava
How Guava RateLimiter solved the traffic limiting problem in high concurrency scenarios. Guava is Google’s open source Java class library that provides a utility class RateLimiter. Let’s take a look at the use of RateLimiter to give you a sense of limiting. Let’s say we have a thread pool that can only handle two tasks per second. If you submit tasks too fast, the system may become unstable, and you need to use flow limiting.
In the example code below, we create a flow limiter with a flow rate of 2 requests/second. Intuitively, two requests per second means that a maximum of two requests per second are allowed to pass through the limiter. In Guava, the speed of flow has a deeper meaning: it is a concept of constant speed. Two requests per second is equivalent to one request per 500 milliseconds.
Call the acquire() method to limit the flow before committing the task to the thread pool. As a result of the execution of the sample code, the time between tasks being submitted to the thread pool is basically stable at 500 milliseconds.
RateLimiter limiter = RateLimiter. Create (2.0); ExecutorService es = Execorate. NewFixedThreadPool (1); // ExecutorService es = Execorate. // Record the last execution time prev = system.nanotime (); For (int I =0; i<20; I++){// limiter. Acquire (); Es.execute (()->{long cur= system.nanotime (); Println ((cur-prev)/1000_000); system.out.println ((cur-prev)/1000_000); prev = cur; }); } Output:... 500 499 499 500 499Copy the code
Classic traffic limiting algorithm: token bucket algorithm
Guava’s current limiter is simple to use, but how does it work? Guava uses the token bucket algorithm, the core of which is that in order to pass the restrictor, you have to get a token. In other words, as long as we can limit the rate at which tokens are issued, we can control the flow rate. The token bucket algorithm is described in detail as follows:
- Tokens are added to the token bucket at a constant rate, with one token added every 1/r second, assuming the rate of flow limiting is R/s;
- If the token bucket is full, the new token is discarded.
- The request can pass through the limiter only if there is a token in the token bucket.
In this algorithm, the rate r of limiting is relatively easy to understand, but what about the capacity B of the token bucket? B is short for burst. It stands for the maximum burst allowed by a current limiter. For example, if b=10 and the token bucket is full, the timer allows 10 requests to pass through the limiter at the same time. Of course, it is only burst traffic. These 10 requests will take away 10 tokens, so subsequent traffic can only pass through the limiter at rate R.
How to implement the token bucket algorithm in Java? Your intuition will probably tell you that the producer-consumer pattern is one in which a producer thread periodically adds tokens to the blocking queue, while a thread attempting to pass the limiter acts as a consumer thread and is only allowed to pass the limiter if it obtains tokens from the blocking queue.
This algorithm looks perfect and is so simple to implement that the implementation is fine if the concurrency is small. However, the reality is that most scenarios using traffic limiting are high-concurrency scenarios, and the system pressure is near the limit, so this implementation becomes problematic. The problem lies in timers. In high-concurrency scenarios, when the system pressure is approaching the limit, the accuracy error of timers will be very large. Meanwhile, timers themselves will create scheduling threads, which will also affect the performance of the system.
So what’s a good way to do that? Of course, Guava’s implementation doesn’t use timers, so let’s see how it works.
How does Guava implement the token bucket algorithm
Guava implements the token bucket algorithm in a very simple way, and the key isRecord and dynamically calculate the time of the next token issue. The following is a simple scenario to illustrate the implementation process of the algorithm. Assume that the capacity of the token bucket is b=1, and the flow limiting rate r =1 requests/second, as shown in the figure below. If there is no token in the current token bucket, the next token is issued at the third second, and a thread T1 requests a token at the second, what should be done in this case?For the thread that requested the token, it obviously needs to wait 1 second, because it will get the token after 1 second (3 seconds). Note that the time for the next token is also increased by 1 second. Why? Because the token issued at the third second has already been preempted by thread T1. After processing, the picture is shown below.Assume that immediately after T1 preoccupies the 3rd second token, another thread T2 requests the token, as shown in the figure below.Obviously, since the next token is generated in the 4th second, thread T2 will have to wait two seconds to obtain the token, and since T2 preoccupies the 4th second token, the next token generation time will be increased by 1 second, as shown in the figure below after it is completely processed.The above threads T1 and T2 both request tokens before the next token generation time. What if the thread requests tokens after the next token generation time? Suppose that five seconds after thread T1 requests the token, or seven seconds, thread T3 requests the token, as shown in the figure below.Since a token has already been generated at the fifth second, thread T3 can pick up the token without waiting. The actual streamer generates three tokens at the seventh second, and one token each at the fifth, sixth, and seventh seconds. Because we assume that the token bucket capacity is 1, 6, 7 seconds the token so rejected, is equivalent to you can also think is 7 seconds remaining tokens, discarded, 6, 5 seconds token, that is 7 seconds of token by thread T3 captured, then the next token of time should be 8 seconds, As shown in the figure below.Through the above brief analysis, you will find that weLimiting traffic can be accomplished easily by simply recording the time when the next token is generated and updating it dynamically. We can codize the algorithm above, as shown in the example below, again assuming that the token bucket has a capacity of 1.The key is the reserve() methodThis method preallocates the token to the thread that requested it and returns the time when the thread was able to obtain the token. The implementation logic is the same as above: if a thread requests a token after the next token generation time, the thread gets the token immediately; Conversely, if the request time is before the next token generation time, the thread obtains the token at the next token generation time. Since the next token is already preoccupied by the thread, the next token generation time needs to be added by 1 second.
Class SimpleLimiter {// Next token generation time long next = system.nanotime (); // Token issuing interval: nanosecond long interval = 1000_000_000; // preoccupy token, Synchronized long reserve(long now){// Request time recalculates the next token generation time after the next token generation time if (now > next){// Reset the next token generation time to the current time next = now; } long at=next; // Set the next token generation time next += interval; // return math.max (at, 0L); } // void acquire() {// Long now = system.nanotime (); Long at=reserve(now); long waitTime=max(at-now, 0); If (waitTime > 0) {try {timeUnit.nanoseconds.sleep (waitTime); }catch(InterruptedException e){ e.printStackTrace(); }}}}Copy the code
What if the capacity of the token bucket is greater than 1? According to the token bucket algorithm, the token is first out of the token bucket, so we need to count the number of tokens in the token bucket as needed, and when a thread requests a token, the token is first out of the token bucket. The specific code implementation is shown below. We added a resync() method that recalculates the number of tokens in the token bucket if the thread requests a token after the next token generation time, using the formula :(now-next)/interval, as you can see from the diagram above. The reserve() method adds the logic that tokens are drawn from the token bucket first, but note that next does not need to add an interval if tokens are drawn from the token bucket.
Class SimpleLimiter {// The number of tokens in the bucket long storedPermits = 0; Long maxPermits = 3; // Next token generation time long next = system.nanotime (); // Token issuing interval: nanosecond long interval = 1000_000_000; After the next token generation time, then // 1. Recalculate the number of tokens in the token bucket // 2. Void resync(long now) {if (now > next) {// Number of newly generated tokens long newPermits=(now-next)/interval; StoredPermits =min(maxPermits, storedPermits + newPermits); // Reset the next token issue time to the current time. Synchronized long reserve(long now){resync(now); synchronized long reserve(long now){resync(now); Long at = next; Long fb=min(1, storedPermits); long fb=min(1, storedPermits); // Net token demand: first subtract the token in the bucket long nr = 1-fb; // recalculate the next token generation time next = next + nr*interval; This.storedpermits -= fb; return at; } // void acquire() {// Long now = system.nanotime (); Long at=reserve(now); long waitTime=max(at-now, 0); If (waitTime > 0) {try {timeUnit.nanoseconds.sleep (waitTime); }catch(InterruptedException e){ e.printStackTrace(); }}}}Copy the code
Above we showed how Guava implements the token bucket algorithm. Our sample code is a simplification of Guava RateLimiter, which extends the standard token bucket algorithm to include support for preheating, for example. For a cache loaded on demand, the cache can support 50,000 TPS concurrency, but before the cache is warmed up, the concurrency of 50,000 TPS directly crashes the cache. Therefore, if the cache needs to be limited, the flow limiter also needs to support the warm-up function. At the initial stage, the limited flow rate r is small, but it grows dynamically. The implementation of the preheat function is very complicated, and Guava constructed an integral function to solve the problem.
High performance network application framework Netty
Network programming performance bottlenecks
In the BIO model, all read() and write() operations will block the current thread. If the client has already established a connection with the server and is not sending data, the server’s read() operation will block. A separate thread is assigned to each socket so that access to other sockets is not affected if threads are blocked on one socket. The BIO thread model is shown below, where each socket corresponds to an independent thread. To avoid frequent thread creation and consumption, thread pools can be used, but the correspondence between sockets and threads does not change.
If you look closely, you can see that in an Internet scenario, although there are many connections, requests on each connection are infrequent, so threads spend most of their time waiting for I/O to be ready. This means that the thread is blocked most of the time, which is totally wasted, and if we could solve this problem, we wouldn’t need so many threads.
With this in mind, we can optimize the threading model to look like the following, where a single thread can handle multiple connections, thus increasing thread utilization and reducing the number of threads required. This is a good idea, but you can’t do it using the BIO API. Why? Because all socket reads and writes related to the BIO are blocking, once the blocking API is invoked, the calling thread blocks until the I/O is ready, and no other socket connection can be processed.Fortunately, Java provides itThe non-blocking (NIO) API enables a thread to handle multiple connections. So how do you do that? Reactor model is widely used now, including the implementation of Netty. So, to understand the Netty implementation, we need to look at the Reactor schema.
Reactor model
The following is a class structure diagram for the Reactor pattern, where Handle refers to an I/O Handle, which in Java network programming is essentially a network connection. The handle_event() method handles I/O events. Each Event Handler handles an I/O Handle. The get_handle() method returns the Handle of this I/O. Synchronous Event Demultiplexer multiplexer understands the I/O multiplexing apis provided to the operating system, such as SELECT () in the POSIX standard and epoll() in Linux.
The core of the Reactor pattern is the Reactor class, where register_handler() and remove_handler() register and delete an event handler. Handle_events () is the core and the Reactor model’s engine. The core logic of this method is as follows: Firstly, the select() method provided by synchronous event multiplexer is used to monitor network events. When the network events are ready, the event processor is traversed to process the network events. Since network events are a constant stream, starting the Reactor pattern in the main program calls the handle_Events () method as while(true){}.
Handlers (){// Listen to network events select(handlers) using the //select() method provided by the simultaneous event selector; Handlers {h.handle_event(); }} // Start the event loop in the main program while (true) {handle_events();Copy the code
The threading model in Netty
Although the implementation of Netty refers to the Reactor model, it does not completely copy it. The core concept of Netty is EventLoop, which is actually a Reactor in the Reactor model. It is responsible for monitoring network events and calling the event processor for processing. In the 4.x version of Netty, network connections and EventLoop have a stable many-to-one relationship, while EventLoop and Java threads have a 1-to-1 relationship, stable in the sense that the relationship does not change once it is established. That is, a network connection will correspond to only one EventLoop, and an EventLoop will correspond to only one Java thread, so a network connection will correspond to only one Java thread.
What are the benefits of having a network connection to a Java thread? The biggest benefit is that event processing for a network connection is single threaded, thus avoiding various concurrency issues.
The thread model in Netty is very similar to the ideal thread model we mentioned earlier in that the core goal is to use a single thread to handle multiple network connections.
Another core concept in Netty is the EventLoopGroup. As the name suggests, an EventLoopGroup consists of a group of Eventloops. In practice, two EventLoopGroups are created, one called bossGroup and one called workerGroup. Why are there two EventLoopGroups?
The socket processes TCP network connection requests in a separate socket. Each TIME a TCP connection is successfully established, a new socket is created. All subsequent reads and writes to TCP connections are completed by the newly created socket. That is, TCP connection requests and read/write requests are processed through two different sockets. When we discussed network requests above, we only discussed read and write requests, not connection requests, to simplify the model.
In Netty, bossGroups are used for connection requests and workergroups are used for read and write requests. BossGroup will submit the connection to workerGroup for processing. WorkerGroup contains multiple EventLoops. Which EventLoop will handle the new connection? This requires a load balancing algorithm, and Netty currently uses the polling algorithm.