I. Business background
At present, a large number of message push are used in the application scenarios of mobile terminals. Push messages can help operators achieve operational objectives more efficiently (such as pushing marketing activities or reminding users of new APP functions).
The push system needs to have the following two features:
-
Message second to the user, no delay, support a million push per second, single million long connection.
-
Supports notification, text, and custom message transparent transmission. Because of the above reasons, the development and maintenance of the system have brought challenges. Below is a simple description of the push system (API-> push module -> mobile phone).
Second, the background of the issue
In the push system, after the long connection cluster runs stability test and pressure test for a period of time, a process will hang up with a small probability (the frequency is about once a month), which will affect the timeliness of some client messages.
The Broker system in the push system is developed based on Netty. This node maintains the long connection between the server and the mobile terminal. When online problems occur, Netty memory leak monitoring parameters are added to troubleshoot problems.
Since the long-connection node is developed by Netty, the following is a brief introduction to Netty for readers to understand.
Three, Netty introduction
Netty is a high-performance, asynchronous event-driven NIO framework based on API implementations provided by Java NIO. It provides support for TCP, UDP and file transfer. As the most popular NIO framework, Netty has been widely used in the field of Internet, big data distributed computing, game industry and communication industry. HBase, Hadoop, Bees, Open source components such as Dubbo are also built on Netty’s NIO framework.
Iv. Problem analysis
4.1 guess
It was initially suspected that the number of long connections was the cause, but after checking logs and analyzing codes, it was not found to be the cause.
Number of long connections: 390,000, as shown below:
Each channel byte size of 1456, based on 400,000 long connections, does not cause the phenomenon of large memory.
4.2 Viewing GC Logs
Check the GC log. It is found that the GC is full frequently (every 5 minutes) before the process hangs, but the memory is not reduced.
4.3 Analyzing Heap Memory Status
The ChannelOutboundBuffer object occupies nearly 5G of memory, and the cause of leakage can be determined basically: The number of entries to the ChannelOutboundBuffer is too large. You can check the source code of the ChannelOutboundBuffer to find the data in it.
Did not write out, resulting in a backlog; Inside the ChannelOutboundBuffer is a linked list structure.
4.4 The analysis data from the figure above is not written out. Why does this happen?
The code actually determines whether the connection is available (channel.isactive) and closes connections that have timed out. Historically, this happens more often when the connection is half open (the client is abnormally closed) – there is no problem with data communication between the two parties.
Based on the above assumptions, the test environment is reproduced and tested.
1) Simulate the client cluster, establish a connection with the long-connection server, set up the firewall of the client node, and simulate the abnormal network scenario between the server and the client (that is, to simulate the successful call of channel. isActive, but the actual data cannot be sent out).
2) Downsize the off-heap memory and continuously send test messages to the previous client. Message size (about 1K).
3) Based on 128M memory, 9W is actually called several times.
V. Problem solving
5.1 Enabling autoRead
Turn off autoRead when a channel is not writable;
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if(! ctx.channel().isWritable()) { Channel channel = ctx.channel(); ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel); String clientId ="";
if(channelInfo ! =null) {
clientId = channelInfo.getClientId();
}
LOGGER.info("channel is unwritable, turn off autoread, clientId:{}", clientId);
channel.config().setAutoRead(false); }}Copy the code
Enable autoRead when data is writable;
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
{
Channel channel = ctx.channel();
ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel);
String clientId = "";
if(channelInfo ! =null) {
clientId = channelInfo.getClientId();
}
if (channel.isWritable()) {
LOGGER.info("channel is writable again, turn on autoread, clientId:{}", clientId);
channel.config().setAutoRead(true); }}Copy the code
Description:
AutoRead is used for more precise rate control, and Netty will register read events for us if it is turned on. When a read event is registered, Netty reads data from a channel if the network is readable. Netty will not register read events if Autoread is turned off.
In this way, even if the peer sends data, the read event will not be triggered, and thus the data will not be read from the channel. When recv_buffer is full, no more data is received.
5.2 Setting high and Low Watermarks
serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024.8 * 1024 * 1024));
Copy the code
Note: The high and low water levels are used with isWritable at the back
5.3 Add the judgment of channel.iswritable ()
In addition to checking channel.isactive (), we also need to check channel.iswrite (). IsActive only ensures that the connection isActive, and writability is determined by isWrite.
private void writeBackMessage(ChannelHandlerContext ctx, MqttMessage message) {
Channel channel = ctx.channel();
// Add channel. IsWritable () judgment
if (channel.isActive() && channel.isWritable()) {
ChannelFuture cf = channel.writeAndFlush(message);
if(cf.isDone() && cf.cause() ! =null) {
LOGGER.error("channelWrite error!", cf.cause()); ctx.close(); }}}Copy the code
Note: isWritable can be used to control the ChannelOutboundBuffer from expanding indefinitely. Its mechanism is to use the set channel high and low water level to judge.
5.4 Problem Verification
Test after modification, no error is reported even after 27W times;
Six, solution analysis
The Netty data processing process is as follows: The service thread processes the read data and then sends it out (the whole process is asynchronous). To improve the network throughput, Netty adds a ChannelOutboundBuffer between the service layer and the socket.
When channel.write is called, all written data is not actually written to the socket, but to the ChannelOutboundBuffer first. Flush is actually written to the socket when channel.flush is called. Since there is a buffer in the middle, there is a rate match, and the buffer is unbounded (linked list), which means that if you don’t control the channel.write speed, a lot of data will pile up in the buffer. If the socket fails to write data (isActive is invalid) or data is written slowly.
The likely result is resource exhaustion, and if ChannelOutboundBuffer stores DirectByteBuffer, this makes the problem even harder to troubleshoot.
The process can be abstracted as follows:
From the above analysis, it can be seen that the first step is written too fast (too fast to process) or the downstream can not send the data will cause problems, which is actually a rate matching problem.
Seven, Netty source code description
Over high water
When ChannelOutboundBuffer capacity more than high water level setting threshold, isWritable () returns false, set up the channel cannot write (setUnwritable), and trigger fireChannelWritabilityChanged ().
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if(newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); }}private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0&& newValue ! =0) {
fireChannelWritabilityChanged(invokeLater);
}
break; }}}Copy the code
Below low water level
When ChannelOutboundBuffer capacity under low water level setting threshold, isWritable () returns true, set up the channel can be written, and trigger fireChannelWritabilityChanged ().
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if(notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); }}private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if(oldValue ! =0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break; }}}Copy the code
Eight, summary
When the ChannelOutboundBuffer capacity exceeds the threshold set for high watermark, isWritable() returns false, indicating that messages are piled up and the write speed needs to be slowed.
When the ChannelOutboundBuffer capacity falls below the lower watermark threshold, isWritable() returns true, indicating that there are too few messages and the write speed needs to be improved. After the preceding three steps are performed, no problem occurs on the deployment line for six months.
Author: Zhang Lin, Vivo Internet Server Team