Hermes

Hermes is a Netty based high-performance, highly scalable network communication framework that supports millions of concurrent connections. It incorporates the design of dubbo and SOFA-Bolt network communication modules. Hemers can be used for IM, long connections, and other applications with the following features:

  • Private communication protocol
    • Customizable codec/decoder
    • Multiple serialization mechanisms are supported
    • CRC check
  • Client/server connection management
    • Building even unlocked
    • Connected heartbeat and idle detection
    • Client connection pool
    • Automatic disconnection and reconnection
    • Efficient and customizable IO model
  • Rich communication models
    • oneway
    • twoway
    • callback
    • future
  • Support client/server asynchronous programming
  • Timeout control
  • SPI extension point loading, scalability
  • authentication

Github address: github.com/IndiraFinis…

1. Usage

  • A synchronous invocation
@Before
public void setUp() {
    client = new BoltClient();
    client.option(BoltClientOption.CONNECT_TIMEOUT, 3000);
    client.startUp();
}

@Test
public void sync_test() { Map<String, Object> map = new HashMap<String, Object>(); Map. put(url.connect_timeout, 9000); Url url = Url.builder() .host("127.0.0.1")
                .port(9091)
                .setParameters(map)
                .build();
    ReqBody requestBody = new ReqBody();
    requestBody.setName("zhang");
    requestBody.setAge(20);
    String body = client.request(url, requestBody);
    logger.info("Client Recv : " + body);
}    
Copy the code
  • The asynchronous call
@Test public void async_test() throws Exception { Map<String, Object> map = new HashMap<String, Object>(); Map. put(url.connect_timeout, 9000); // Call map.put(url.async,true);
        Url url = Url.builder()
                .host("127.0.0.1")
                .port(9091)
                .setParameters(map)
                .build();
        ReqBody requestBody = new ReqBody();
        requestBody.setName("zhang");
        requestBody.setAge(20);
        CompletableFuture<String> future = client.request(url, requestBody);
        logger.info("Client Recv : " + future.get());
    }
Copy the code
  • The callBack invocation
@Test public void call_back() throws Exception { Map<String, Object> map = new HashMap<String, Object>(); Map. put(url.connect_timeout, 9000); // Call map.put(url.async,true);
        Url url = Url.builder()
                .host("127.0.0.1")
                .port(9091)
                .setParameters(map)
                .build();
        ReqBody requestBody = new ReqBody();
        requestBody.setName("zhang");
        requestBody.setAge(20);
        CompletableFuture<String> future = client.request(url, requestBody);
        CountDownLatch latch = new CountDownLatch(1);
        future.whenComplete((res, cause) -> {
            if(cause ! = null) {// Exception handling} latch.countdown (); logger.info("Client Recv : " + res);
        });
        latch.await();
    }

Copy the code
  • One-way invocation
@Test public void oneway_test() throws Exception { Map<String, Object> map = new HashMap<String, Object>(); Map. put(url.connect_timeout, 9000); Map. put(url.oneway,true);
        Url url = Url.builder()
                .host("127.0.0.1")
                .port(9091)
                .setParameters(map)
                .build();
        ReqBody requestBody = new ReqBody();
        requestBody.setName("zhang");
        requestBody.setAge(20);
        client.request(url, requestBody);
    }
Copy the code

ReqBody implements the Serializable interface

  • Command processor CommandHandler

The GeneralCmdHandler handles general commands by default. If your request does not specify a command it will be handled by the GeneralCmdHandler by default. All you need to do is register UserProcesser. After registration, configure extension point implementation in meta-INF /services/ folder.

public class SimpleReqProcesser extends AbstractUserProcessorAdapter<ReqBody> { private Logger logger = LoggerFactory.getLogger(getClass()); // Data type @override public Stringinterest() {
        return ReqBody.class.getName();
    }


    @Override
    public String handleRequest(ReqBody body) throws Exception {
        logger.error("handleRequest: " + body.toString());

        return "server success"; } // Whether to serialize the body and process business in the IO thread @override public BooleanprocessInIOThread() {
        return false; }}Copy the code
  • Extension CommandHandler
  1. inheritanceAbstractCommandHandlerOverride the handleRequest and handleResponse methods to handle your commands.
  2. inMETA-INF/services/Folder configuration extension point implementation.

You can refer to HeartbeatHandler and GeneralCmdHandler

2. Source code design

2.1 Architecture Design

2.2 Protocol Design

  1. The first byte is the magic number, which can be fast-fail for non-protocol packets, without decoding and ensuring security.
  2. The ninth bit is the reqUST/Response flag. 1 indicates reqUST.
  3. The 10th bit indicates whether the call is one-way or two-way.
  4. The 11th bit indicates whether a heartbeat packet is displayed.
  5. The remaining 5 bits of the second byte represent the serialization ID, and currently only hessian serialization is supported.
  6. Bytes 3-4 (short) represent commnd code, which indicates the command type of this package.
  7. The fifth byte represents the status of the response so that the client can quickly identify the exception.
  8. The sixth byte indicates the flags, which indicates whether to enable CRC redundancy check and data compression.
  9. Bytes 7-10 (int) represent the unique ID of the request used to wake up a blocked thread during two-way communication.
  10. Bytes 11-14 (int) represent the length of the data body, which is used for decoding.

2.2.1 Protocol Commands

2.2.2 Command processor design

2.3 codec

Due to THE PROBLEM of TCP sticking and unpacking, generally speaking, the encoding and decoding is divided into the following ways:

  • Separator based protocol
  • Fixed-length based protocol
  • Variable length based protocol

In general, regardless of the method used, the codec needs to inherit MessageToByteEncoder and ByteToMessageDecoder classes. Note that ByteToMessageDecoder maintains a ststate BtyeBuf aggregator. All decoders are ststate and @channelHandler. Sharable cannot be used

Bolt-extension refers to Dubbo on the basis of SOFA – Bolt, and adds the check of the load data size of the client and the server, which can realize the fast-fail of large packages.

2.4 Connection Management

The FixedChannelPool based on Netty realizes client connection pool and client concurrency control.

Server-side concurrency control you can implement using connection listeners

BoltServer server = new BoltServer(); server.option(BoltServerOption.PORT,9091); Server. AddConnectionEventProcessor (ConnectionEventType. CONNECT, ((connection) - > {/ / concurrency control, connection statistics etc})); server.startUp();Copy the code

FixedChannelPool principle

Sofa – Bolt’s client connection pool establishes the principle of connection without lock

  1. PutIfAbsent of ConcurrentHashMap is used to ensure that the connection pool is established without a lock
  2. Use FutureTask’s new feature that Callable executes once in a concurrent environment to solve concurrency problems

2.5 Idle Detection and Heartbeat

Idle detection Based on Netty IdleStateEvent idle detection of read and write.

  • Client: detects read timeout only and sends heartbeat every 15 seconds by default. If no response is received for more than three times, the connection is closed and reconnected
  • Server: Detects read/write timeout. If there is no read/write within 90 days by default, the connection is closed and the connection is reconnected.

2.6 reconnection

On the basis of SOFA – Bolt, HashedWheelTimer is used to realize the exponential retreat operation of reconnection time. When the client updates the read timestamp for three times, it immediately reconnects, with 3s for the second time, 6 seconds for the third time, and so on. The default number of attempts is six.

Netty best practices

  1. The most appropriate Reactor model is selected. Generally speaking, the accept thread of the server is 1, and the worker thread is CPU *2

  2. Ensure that I/O events and services are serialized to avoid locking operations.

    • PutIfAbsent of ConcurrentHashMap (essentially CAS).
    • Execution with EventLoop ensures that tasks are serialized and thread context switches are avoided.
    • Features of Futuretask and Callable in multithreading.
  3. IO intensive tasks can be processed directly in the I/O thread (EventLoop thread) to avoid the time-consuming thread context switch. CPU intensive tasks should be separated from the I/O thread and free the I/O thread for read/write. It is important to configure threads flexibly.

  4. For stateless ChannelHandler, the shared mode @channelHandler. Sharable should be set to avoid generating too many objects. 5. Ctx.write () and ctx.channel().write() methods of ChannelHandlerContext. The former processes only the Handler before the current Handler, while the latter processes the entire ChannelPipeline, starting at the tail node.

  5. The isWritable() method should be used to determine the write buffer level in the current ChannelOutboundBuffer when writing data. Because writeAndFlush is sent to the ChannelOutboundBuffer first, if the receiving window remains small or the network is congested, it may cause an OOM to occur.

  6. Do not use hash tables when you can use arrays

  7. Timeout control can be done using HashedWheelTimer.