A simple introduction

Mercury is the company’s long-link infrastructure that supports IM two-way communications and channel transmission modules. It aims to replace the cloud letter and provide high availability real-time communication technology support for the company to explore in the pan-entertainment field, so that the business can run fast while ensuring good user experience.

The server is developed based on Netty framework, supports TCP or WebSocket connection, and uses private protocol for message transmission, which is secure, efficient and saves traffic.

Request processing model

As a long-connection gateway, Mercury needs to handle a huge number of client requests, and to do so, it needs to design a stable and efficient request processing model.

Netty processing model

Netty uses the master-slave Reactor model

BossGroup processes accept events, and workerGroup processes read and write events

ChannelPipeline is a two-way linked list. Netty defines more than a dozen events. When triggered, all ChannelHandler methods will be called sequentially. ChannelHandler calls are all executed by the same EventLoop thread in the workerGroup, with no switching between threads.

This lock-free design avoids context switching and provides high performance in the event of a high volume of requests. However, there is a risk that if execution of a ChannelHandler blocks, it will drag down other requests that the EventLoop thread is responsible for.

In a practical scenario, ChannelHandler will perform IO operations such as RPC calls, MQ, etc. There is no guarantee that this will not block. As a result, many high-performance distributed frameworks use a three-tier processing model, adding an additional business thread pool where time-consuming IO operations can be executed without blocking the threads in the workerGroup.

Kafka three-layer processing model


This parameter specifies the number of threads in the default configurations. Acceptor: 1, Processor: 3, IO: 8

Acceptor threads use polling to distribute requests fairly to all network threads, preventing skew in request processing.

Designed this way, Kafka can support a large number of network requests

Mercury processing model


The overall design is loosely based on Dubbo

In addition to encode and decode in the pipeline, I only implemented a NettyServerHandler

NettyServerHandler uses the decorator pattern to implement a different division of the handler

AccessChannelHandler: Authenticates the permission based on the IP address and rejects connections that do not match the IP address

**CatReportChannelHandler: ** REPORTS CAT data

IdleChannelHandler: Processes tasks such as heartbeat and detection messages and handshakes

DispatchChannelHandler: Dispatches requests from the thread pool

MercuryServerHandler: The upper-level business data processing handler

As mentioned above, the ChannelHandler in Netty has over a dozen events, but for our purposes we only need to focus on a few of them. Therefore, a layer of mapping is done in the NettyServerHandler

channelActive connect Establish a connection
channelInactive disconnect disconnect
channelRead receive Read data
write send Write data
exceptionCaught caught Exception handling

NettyChannel is the encapsulation of the underlying Channel of Netty. The business layer directly operates NettyChannel without caring about the processing of the underlying channel, shielding the details of using Netty. Inside to achieve the maintenance of the connection management and message sending optimization

The MercuryServerHandler routes the current request to the corresponding business Handler for processing based on the Command Command in the request

LoginAuthHandler: Related to login authentication

IMMsgHandler: Single chat, group chat, etc

ChatRoomMsgHandler: Chat room messages

WhiteboardMsgHandler: Whiteboard message

ClientPushMsgHandler: Client push class message

There is also a HandlerWrapper class that wraps around the above business processing handlers and executes the handler interceptor chain, similar to the interceptors in Spring MVC that add some additional processing logic before and after the business processing

DispatchChannelHandler


We’ll focus on the DispatchChannelHandler, which is used by thread pools to distribute requests.

Dubbo, for example, provides four types of thread pools via SPI. Fixed * is used by default on the server and cached is used by default on the client. But the underlying layer is ThreadPoolExecutor

In the IM scenario, the processing sequence of a single channel must be strictly guaranteed, but the sequence of different channels may not be guaranteed.

Here I’m implementing a thread pool on my own OrderedChannelExecutor

  1. Each channel generates a channelId (this does not use the default Netty ID, but is generated by a certain rule, mainly because some business needs).

  2. All requests in the channelId are routed to the same MpscQueue by a consistent hash on the channelId. (MpscQueue is a lock free queue in JCTools, which guarantees concurrent security and high performance. Netty’s underlying queue is this one.)

  3. MpscQueue selects a thread from the thread pool to execute a task, and polls the next task from the queue until the current task is completed

  4. As long as there is data in the MpscQueue, it will hog the thread and wait for the queue to run out of tasks before putting it back into the thread pool

Currently, the number of mpscqueues and threads is 1:1, so each queue can get a thread without waiting

When designing, I did not bind Selector to thread as netty Eventloop does, because I think thread is a precious resource. The configuration of the production machine is 4 cores and 8GB, and the performance of more threads is not necessarily improved, while queues are relatively good, but consume a bit of memory. A lot of improvements have been made to save memory in the project, using pooling techniques a lot. 8GB of memory is sufficient, so the number of queues can be larger than the number of threads.

At that time, I wanted to implement a hierarchical queue, which can be divided according to the priority. The higher the priority is, the higher the probability of execution is, and the lower the priority is, when the system is overloaded, the delay of processing, discarding, or rapid failure is allowed. However, in the previous pressure test, the single machine can support 1.5W QPS, but it is still far from reaching this magnitude online, so it is temporarily shelved.

Consistent hashing algorithm


The hash algorithm uses FNV, which can quickly hash large amounts of data with low collision rates. Its high dispersion makes it suitable for hashing very similar strings, such as URLS, hostname, file names, text, IP addresses, etc.

Because I hash the channelId, which contains the IP addresses of the client and the server, I use FNV. In addition, there are eight virtual nodes for each real node, so the final hash distribution is fairly uniform.

Other hash algorithms can also be used for other business scenarios, such as MurmurHash2 with high performance and low collision rate, which is used in Redis.

As for the implementation of consistent hashing, it is easy to implement in Java, directly using TreeMap’s features. Stand on the shoulders of Doug Lea and make it so easy. Here’s my implementation

Dubbo inside of load balancing LoadBalance ConsistentHashLoadBalance also provides a consistent hash algorithm, but it is using a Ketama algorithm, achieve a little more complicated. Because other hashing algorithms have universal consistent hashing implementations, just replacing hashing, but Ketama is a whole process. If you are interested, you can also pull down the source code to learn about its implementation.