Netty provides many preset codecs and processors almost out of the box, reducing the time and effort spent on cumbersome transactions
Idle connections and timeouts
Detecting idle connections and timeouts is critical to freeing resources, and Netty provides several ChannelHandler implementations for this
The name of the | describe |
---|---|
IdleStateHandler | When the connection is idle for too long, an IdleStateEvent event is emitted, which you can then handle by overwriting the userEventTriggered() method in ChannelInboundHandler |
ReadTimeoutHandler | If no inbound data is received within the specified time interval, a ReadTimeoutException is thrown and the corresponding Channel is closed. This ReadTimeoutException can be detected by overriding the exceptionCaught() method in your ChannelHandler |
WriteTimeoutHandler | If no outbound data is written within the specified time interval, a WriteTimeoutException is thrown and the corresponding Channel is closed. This WriteTimeoutException can be detected by overriding the exceptionCaught() method in your ChannelHandler |
The code below shows that when using the usual method of sending a heartbeat message to a remote node, we are notified if no data is received or sent within 60 seconds, and if there is no response, the connection is closed
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler will send an IdleStateEvent event when triggered
pipeline.addLast(new IdleStateHandler(0.0.60, TimeUnit.SECONDS));
// add a HeartbeatHandler to the ChannelPipeline
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends SimpleChannelInboundHandler {
// Heartbeat message sent to the remote node
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// Sends a heartbeat message and closes the connection if sending fails
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt); }}@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {}}}Copy the code
Decodes separator – based protocols
Delimiter based messaging protocols use defined characters to mark the beginning or end of a message or message segment. The following table lists decoders to help you define custom decoders that can extract frames separated by any sequence of tags
The name of the | describe |
---|---|
DelimiterBasedFrameDecoder | Frames are extracted using user-supplied separators |
LineBasedFrameDecoder | Frames are separated by a line terminator (\n or \r\n) |
The following code shows how to use LineBasedFrameDecoder to handle end-of-line delimited frames
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// This LineBasedFrameDecoder forwards the extracted frame to the next ChannelInboundHandler
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
// Add a FrameHandler to receive frames
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// do something}}}Copy the code
If you also use besides end-of-line character of separators to separate frames, you can also use DelimiterBasedFrameDecoder, just need to specific delimiters sequence assigned to its constructor
As an example, we will use the following protocol specification:
- The incoming data stream is a series of frames, each separated by the newline character \n
- Each frame consists of a series of elements, each separated by a single space character
- The contents of a frame represent a command, defined as a command name followed by a variable number of arguments
Based on this protocol, our custom decoder will define the following classes:
- Cmd – Stores the commands for frames in ByteBuf, one for names and one for parameters
- CmdDecoder – Takes a line of string from the decode() method overridden and builds a Cmd instance from its contents
- CmdHandler – Gets the decoded Cmd object from CmdDecoder and does some processing with it
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
static final byte SPACE = (byte) ' ';
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CmdDecoder(64 * 1024));
pipeline.addLast(new CmdHandler());
}
/** * Cmd POJO */
public static final class Cmd {
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf getArgs(a) {
return args;
}
public ByteBuf getName(a) {
returnname; }}public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// Extract frames separated by a line end sequence from ByteBuf
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
// If there are no frames in the input, null is returned
if (frame == null) {
return null;
}
// Find the index of the first space character, preceded by the command name, followed by the argument
int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);
// Create a new Cmd object with a slice containing the command name and parameters
return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex())); }}public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, Cmd msg) throws Exception {
// Handle ChannelPipeline Cmd objects}}}Copy the code
Length based protocol
A length-based protocol defines a frame by encoding its length to the head of the frame, rather than using a special delimiter to mark its end. The following table lists two decoders provided by Netty for handling this type of protocol
The name of the | describe |
---|---|
FixedLengthFrameDecoder | Extracts the fixed-length frame specified when the constructor is called |
LengthFieldBasedFrameDecoder | Extract the frame based on the length value in the frame header: the offset of this field and the length are specified in the constructor |
You often encoded into the message header of the frame size is not a fixed value of agreement, in order to deal with the longer frames, you can use LengthFieldBasedFrameDecoder, it will be from the head field frame length is determined, and then extracted from the data stream specifies the number of bytes
The following figure shows an example where the length field has an offset of 0 in the frame and is 2 bytes long
The following code shows how to use its three constructors, the maxFrameLength, lengthFieldOffser, and lengthFieldLength constructors. In this scenario, the length of the frame is encoded in the first 8 bytes of the frame
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
/ * * * use LengthFieldBasedFrameDecoder frame length decoding coded into the frame of 8 bytes before the start of the message * /
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024.0.8));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// do something}}}Copy the code
Writing large data
Because of the potential for network saturation, how to efficiently write large chunks of data in an asynchronous framework is a particular problem. Since the write operation is non-blocking, even if not all data has been written out, the write operation will be returned and notified to ChannelFuture when it is complete. When this happens, you run the risk of running out of memory if you keep writing. Therefore, when writing large data, you need to consider the case that the connection of the remote node is slow connection, which can cause the delay of memory release. Let’s consider writing out the contents of a file to the network
NIO’s zero-copy feature eliminates the copying process of moving the contents of a file from the file system to the network stack. All of this is happening in the heart of Netty, so all the application needs to do is use an implementation of the FileRegion interface
The following code shows how to create a DefaultFileRegion from a FileInputStream and write it to a Channel
Create a FileInputStream
leInputStream in = new FileInputStream(file);
// Create a new DefaultFileRegion with the full length of the file
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// Send the DefaultFileRegion and register a ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Processing failed
if(! future.isSuccess()) { Throwable cause = future.cause();// do something}}});Copy the code
This example applies only to direct transfers of file contents, not to any processing of the data by the application. When you need to copy data from the file system to user memory, you can use ChunkedWriteHandler, which allows you to write large data streams asynchronously without incursing significant memory consumption
The type parameter B in interface ChunkedInput<B> is the type returned by the readChunk() method. Netty presets four implementations of this interface, as shown in the table, each representing a stream of variable length that will be processed by ChunkedWriteHandler
The name of the | describe |
---|---|
ChunkedFile | Retrieve data block by block from a file, for use when your platform does not support zero copy or when you need to convert data |
ChunkedNioFile | Similar to ChunkedFile, except that it uses a FileChannel |
ChunkedStream | Transfer content block by block from InputStream |
ChunkedNioStream | Progressively transfer content from ReadableByteChannel |
The following code illustrates the use of ChunkedStream, which is the most common implementation in practice. The class shown is instantiated with a File and an SSLContext, and when the initChannel() method is called, it initializes the Channel with the ChannelHandler chain shown
When a Channel’s state becomes active, WriteStreamHandler will block out data from the file as ChunkedStream
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslContext;
public ChunkedWriteHandlerInitializer(File file, SslContext sslContext) {
this.file = file;
this.sslContext = sslContext;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslContext.newEngine(ch.alloc())));
// Add ChunkedWriteHandler to handle data passed in as ChunkedInput
pipeline.addLast(new ChunkedWriteHandler());
// Once the connection is established, WriteStreamHandler starts writing file data
pipeline.addLast(new WriteStreamHandler());
}
public final class WriteStreamHandler extends SimpleChannelInboundHandler<Channel> {
/** * When a connection is established, the channelActive() method writes file data using ChunkedInput */
@Override
protected void messageReceived(ChannelHandlerContext ctx, Channel msg) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(newFileInputStream(file))); }}}Copy the code
Serialized data
The JDK provides ObjectOutputStream and ObjectInputStream for serializing and deserializing the basic data types and graphs of POJOs over the network. The API is not complex and can be applied to any object that implements the Java.io.Serializable interface. But its performance isn’t very efficient. In this section, we’ll see how Netty implements serialization
1. JDK serialization
If your application must interact with remote nodes using ObjectOutputStream and ObjectInputStream, and for compatibility, JDK serialization would be the right choice. The following table lists Netty’s serialized classes for interacting with the JDK
The name of the | describe |
---|---|
CompatibleObjectDecoder | Decoders that interoperate with non-Netty based remote nodes that use JDK serialization |
CompatibleObjectEncoder | Encoders that interoperate with non-Netty based remote nodes serialized with the JDK |
ObjectDecoder | A decoder built on top of JDK serialization that uses custom serialization to decode |
ObjectEncoder | Encoders built on TOP of JDK serialization encoded using custom serialization |
2. Protocol Buffers serialization
Protocol Buffers is an open source data exchange format developed by Google that encodes and decodes structured data in a compact and efficient way and can be used across multiple languages. The following table shows the ChannelHandler implementation provided by Netty to support Protobuf
The name of the | describe |
---|---|
ProtobufDecoder | The message is decoded using Protobuf |
ProtobufEncoder | The message is encoded using Protobuf |
ProtobufVarint32FrameDecoder | The received ByteBuf is dynamically split based on the value of the Base 128 Varints integer length field of Google Protobuf Buffers in the message |
ProtobufVarint32LengthFieldPrepender | Prefix ByteBuf with a length field of Google Protobuf Buffers Base 128 Varints integer |