preface

First, let’s talk briefly about the topic of this time. As I have been working on the development of the Internet of Things recently, I will inevitably encounter the interaction with devices.

The most important work is to have a system to support equipment access, push messages to equipment; At the same time, there are a lot of devices to access.

Therefore, the content shared this time can not only satisfy the Internet of Things field, but also support the following scenarios:

  • Web-based chat systems (peer-to-peer, group chat).
  • Scenarios requiring server push in WEB applications.
  • Message push platform based on SDK.

Technology selection

To meet a large number of connections, while supporting duplex communication, and performance must be guaranteed.

Selection in the Java technology stack starts naturally with the exclusion of traditional IO.

NIO is the only choice. In fact, there are not many choices at this level. Considering the community, data maintenance and other aspects, Netty was finally chosen.

The final architecture diagram is as follows:

Now it doesn’t matter, below one by one introduction.

Protocol parsing

Since it is a message system, it is natural to define the protocol format with the client.

The most common and simple protocol is HTTP, but one of our requirements is for a two-way interaction, and HTTP is more browser oriented. What we need is a much more streamlined protocol that eliminates a lot of unnecessary data transfers.

So I think it’s best to customize your own proprietary protocol to meet the business needs, and in my case there are standard iot protocols.

For other scenarios, the popular RPC framework can be used to customize the private protocol to make the communication between the two parties more efficient.

Either way, the experience of the past few years suggests that there should be a place for security in the protocol.

The content related to the protocol is only discussed, more specific applications are introduced.

Simple implementation

First think about how to implement the functionality, and then think about millions of connections.

Register authentication

Authentication is the first thing to consider before doing real news up and down.

Just like you use wechat, the first step must be login, no matter who can directly connect to the platform.

So the first step is to register.

See the registration/authentication module in the architecture diagram above. Generally speaking, the client needs to pass a unique identifier through HTTP request. After the backend authentication passes, the client responds with a token and maintains the relationship between the token and the client in Redis or DB.

The client also saves this token locally and carries it with it on every subsequent request. Once the token expires, the client needs to request the token again.

After the authentication succeeds, the client directly connects to the push-server module in the figure through TCP.

This module is really the up and down processing of messages.

Save channel relationships

After the connection is connected, the relationship between the current client and Channel needs to be maintained before the actual business can be processed.

Assuming that the unique identifier of the client is the mobile phone number, the mobile phone number and the current Channel need to be maintained in a Map.

This is similar to SpringBoot’s integration of long-connect heartbeats.

At the same time, in order to obtain the unique identifier (mobile phone number) of the client through the Channel, corresponding attributes need to be set in the Channel:

public static void putClientId(Channel channel, String clientId) { channel.attr(CLIENT_ID).set(clientId); }Copy the code

When obtaining the current mobile phone number:

public static String getClientId(Channel channel) { return(String)getAttribute(channel, CLIENT_ID); }Copy the code

In this way, we can log when the client is offline:

String telNo = NettyAttrUtil.getClientId(ctx.channel()); NettySocketHolder.remove(telNo); log.info("Client offline, TelNo=" + telNo);Copy the code

One thing to note here is that it is best to preset the size of the Map that holds the client-channel relationship (avoid frequent expansion), as it will be the most frequently used object and the one with the largest memory footprint.

Message upward

Next comes the actual business data upload, usually the first step is to determine what type of business the upload message enters.

In chat scenarios, text, pictures, and videos may be uploaded.

So we have to differentiate, to do different things; This has to do with the protocol that the client negotiates.

  • You can use a field in the message header to differentiate.
  • Even simpler is a JSON message with a single field that distinguishes the different messages.

Whatever it is, you have to be able to tell it apart.

Message parsing and business decoupling

After the message can be parsed, the business can be processed, such as writing to a database, calling other interfaces, etc.

We all know that in Netty messages are typically handled in the channelRead() method.

This is where messages can be parsed and types can be distinguished.

But if our business logic were in there, it would be a lot more.

Even if we split up several developments to handle different businesses, there would be a lot of conflict, difficulty in maintenance, and so on.

So it is essential to keep message parsing completely separate from business processing.

This is where interface oriented programming comes into play.

The core code here is consistent with the build a Wheel CICada (lightweight WEB framework).

An interface is defined to process the business logic, and then after the message is parsed, reflection is used to create a concrete object to execute the handler function.

So different businesses, different developers just need to implement this interface and implement their own business logic.

The pseudocode is as follows:

To see cicADA in action, click here:

https://github.com/TogetherOS/cicada

There is one more caveat to the upside; Because the connection is based on a long connection, the client periodically sends heartbeat packets to maintain the connection. At the same time, the server also checks the connection. If no message is received within N intervals, the server disconnects the connection to save resources.

This can be done using an IdleStateHandler. See Netty(a) SpringBoot for the long connection heartbeat mechanism.

News downward

When there is an up there is also a down. For example, in a chat scenario, there are two clients connected to a push-server and they directly need point-to-point communication.

The flow is as follows:

  • A sends the message to the server.
  • After receiving the message, the server knows that the message is to be sent to B and needs to find B’s Channel in the memory.
  • Forward A’s message through B’s Channel.

This is a downward flow.

Even the need for administrators to send system notifications to all online users is similar:

Iterate through the Map of channel relationships and send messages one by one. This was the main reason we needed to put it in the Map.

The pseudocode is as follows:

Please refer to:

https://github.com/crossoverJie/netty-action/

Distributed scheme

Standalone version of the implementation, now focus on how to achieve millions of connections.

Millions of connections is just an adjective, but more of an expression of how to implement a distributed solution that can be flexibly scaled horizontally to support more connections.

Before we do this, we need to figure out how many connections our standalone version can support. There are many more factors that affect this.

  • Configure the server itself. Memory, CPU, NIC, and maximum number of open files supported by Linux.
  • Use its own configuration, because Netty itself depends on off-heap memory, but the JVM itself also needs to use some memory, such as large maps for channel relationships. This needs to be adjusted according to their own situation.

You can test the maximum number of connections supported by a single node.

No matter how to optimize a single machine, there is an upper limit, which is the main problem of distributed solution.

Architecture is introduced

The overall architecture diagram posted above is the first thing I need to talk about before going into the concrete implementation.

Let’s start on the left.

The registration authentication module mentioned above is also clustered and loaded with Nginx in front. As mentioned earlier, its main purpose is to do authentication and return a token to the client.

But after the push-server cluster, it takes on another role. That is to return a push-server available to the current client.

The platform on the right generally refers to the management platform, which can view the current real-time online number and push messages to specified clients.

Push messages need to go through a push-server to find the real push node.

Other middleware such as Redis, Zookeeper, Kafka, and MySQL are all prepared for these functions. See the implementation below.

Registration found

First of all, the first problem is registration discovery. How to choose an available node for the client after the push-server becomes multiple is the first problem to be solved.

This is actually covered in detail in distributed (1) handling service registration and discovery.

All push-servers need to register their information with Zookeeper when they start up.

The registration authentication module subscribes to the nodes in Zookeeper to get the latest list of services. The structure is as follows:

Here is some pseudocode:

The application starts registering Zookeeper.

To register an authentication module, simply subscribe to the Zookeeper node:

Routing strategy

Given that you have access to all the service lists, how do you choose the right push-server for the client to use?

The following points are important to consider in this process:

  • Try to ensure that all nodes are connected evenly.
  • Whether to add and delete nodes to make Rebalance.

Firstly, there are the following algorithms to ensure equilibrium:

  • Polling. Assign each node to the client one by one. However, uneven distribution of new nodes may occur.
  • Hash method of taking modules. Similar to HashMap, but with polling issues. You can Rebalance it like a HashMap and reconnect all the clients. However, this will cause all the connections to break and reconnect, which is a bit costly.
  • Because of the consistency of the Hash algorithm, some clients still need Rebalance.
  • The weight. You can manually adjust the load of each node, and even make it automatic. Based on monitoring, when the load of some nodes is high, it will automatically reduce the weight, and the weight of the lower load can be increased.

Another question is:

What happens to the client on this node when we restart part of the application for upgrade?

Since we have a heartbeat mechanism, when the heartbeat fails, it can be considered that the node has a problem. Then you have to re-request the registration authentication module to get an available node. The same applies in the case of weak networks.

If the client is sending a message, the message needs to be saved locally until a new node is obtained and sent again.

Stateful connection

In such a scenario, unlike HTTP, which is stateless, we need to know the relationship between each client and the connection.

In the standalone version above we kept this relationship in a local cache, but it clearly doesn’t work in a distributed environment.

For example, when a platform pushes a message to a client, it must first know on which node the client channel is stored.

Based on our previous experience, such problems naturally lead to the introduction of a third party middleware to host the relationship.

That is, Redis, which stores routing relations in the architecture diagram. When the client connects to push-server, the unique identifier of the current client and IP +port of the service node need to be saved in Redis.

This connection must also be deleted in Redis when the client is offline.

Ideally, the map relationships in each node’s memory should add up to exactly the data in Redis.

The pseudocode is as follows:

There are concurrency issues when storing routing relationships here, so it is best to use a Lua script instead.

Push the routing

Consider this scenario: What happens when an administrator needs to push a system message to a recently registered client?

Combined architecture diagram

Assume that there are 10W clients in this batch. First, we need to send these numbers to a push route through Nginx under the platform.

For efficiency, you can even split the numbers again on each push-route.

After getting the specific number, start multi-threading according to the number to obtain the push-server corresponding to the client in the previous route Redis.

Then call push-server through HTTP for real message delivery (Netty also supports HTTP protocol well).

After the push is successful, you need to update the result to the database. Offline clients can push the result again based on their services.

The message flow

There may be scenarios where client-side upstream messages are very important, need to be persisted, and the message volume is very large.

Kafka can be used to decouple a push-sever server when it is not an option.

Dump all upstream data directly into Kafka and leave it at that.

Then the consumer program takes out the data and writes it into the database.

Disruptor Disruptor is also out of memory. Disruptor Disruptor is also out of memory.

More on Kafka later.

Distributed problem

Distribution solves performance problems but brings other problems.

Application of monitoring

How do you know the health of dozens of push-server nodes online?

This is where the monitoring system comes into play. We need to know the current memory usage, GC of each node.

And the memory usage of the operating system itself, since Netty uses a lot of off-heap memory.

At the same time, the current online number of each node and the online number in Redis need to be monitored. Theoretically these two numbers should be equal.

In this way, the usage of the system can be known and the number of nodes can be maintained flexibly.

Log processing

Logging is also very important, for example, when you get feedback that a client has been disconnected, you need to know what the problem is.

It is best to add a traceID log to each request so that you can use the log to see where the stuck is in each node.

And ELK.

conclusion

This is based on my daily experience. Some pits may not have been stepped on in my work, so there are still some places left out.

At present, it is quite troublesome to make a stable push system, which involves a lot of points, only after really doing it will know.

If you feel helpful after reading it, please don’t hesitate to forward and share it.