codecs

Each network application must define how to parse raw bytes sent back and forth between the two nodes and how to convert them to and from the data format of the target application. This conversion logic is handled by the codec, which consists of an encoder and a decoder, each of which can convert a byte stream from one format to another

  • The encoder converts the message into a format suitable for transport (most likely a byte stream)
  • The decoder converts the network byte stream back to the application’s message format

Thus, the encoder operates on outbound data, while the decoder processes inbound data

1. The decoder

In this section, we’ll examine the decoder classes provided by Netty and provide concrete examples of when and how to use them, which cover two different use cases:

  • Decode bytes into messages – ByteToMessageDecoder and ReplayingDecoder
  • Decode one message type into another – MessageToMessageDecoder

When will the decoder be used? This is used whenever inbound data needs to be converted for the next ChannelInboundHandler in the ChannelPipeline. In addition, thanks to ChannelPipeline’s design, multiple decoders can be linked together to implement arbitrarily complex conversion logic

1.1 Abstract class ByteToMessageDecoder

Decoding bytes into messages is a common task, and Netty provides an abstract base class, ByteToMessageDecoder, that buffers the inbound data until it is ready to process it

As an example of how to use this class, suppose you receive a byte stream containing simple ints, each of which needs to be processed separately. In this case, you need to read each int from the inbound ByteBuf and pass it to the next ChannelInboundHandler in the ChannelPipeline. To decode the byte stream, you extend the ByteToMessageDecoder class. (Note that atomic ints are automatically boxed as Integer when added to a List.)

// Extend ByteToMessageDecoder to decode bytes into a specific format
public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // Check if there are at least 4 bytes readable (1 int in length)
        if (in.readableBytes() >= 4) {
            // Reads an int from the inbound ByteBuf and adds it to the List of decoded messagesout.add(in.readInt()); }}}Copy the code

While ByteToMessageDecoder makes it easy to implement this pattern, you may find it a bit tedious to have to verify that the entered ByteBuf has sufficient data before calling the readInt() method. ReplayingDecoder, which is a special decoder, eliminates this step with a small amount of overhead

1.2 Abstract class ReplayingDecoder

ReplayingDecoder extends the ByteToMessageDecoder class so that we don’t have to call the readableBytes() method. It does this by using a custom ByteBuf implementation, ReplayingDecoderByteBuf, that wraps the incoming ByteBuf, which will execute the call internally

The full declaration of this class is:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
Copy the code

The type parameter S specifies the type used for state management, where Void indicates that state management is not required. The following code shows a ToIntegerDecoder reimplemented based on ReplayingDecoder

// Extend ReplayingDecoder
      
        to decode bytes into messages
      
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
    // The incoming ByteBuf is ReplayingDecoderByteBuf
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // Reads an int from the inbound ByteBuf and adds it to the List of decoded messagesout.add(in.readInt()); }}Copy the code

As before, ints extracted from ByteBuf will be added to the List. If there are not enough bytes available, the implementation of the readInt() method will throw an Error, which will be caught and processed in the base class. The decode() method is called again when more data is available to read

Note the following aspects of ReplayingDecoderByteBuf:

  • Not all ByteBuf operations are supported, if calling a method is not supported, will throw an UnsupportedOperationException
  • ReplayingDecoder is slightly slower than ByteToMessageDecoder

The following classes are for handling more complex use cases:

  • Io.net ty. Handler. Codec. LineBasedFrameDecoder – this class in Netty internal use, it USES the end-of-line control characters (or \ n \ r \ n) to parse the message data
  • Io.net ty. Handler. Codec. HTTP. HttpObjectDecoder – HTTP data decoder
1.3 Abstract class MessageToMessageDecoder

In this section, we will explain how to convert between two message formats, for example, from one POJO type to the other

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
Copy the code

The parameter type I specifies the type of MSG, the input parameter to the decode() method, which is the only method you must implement

We’ll extend the MessageToMessageDecoder by writing an IntegerToStringDecoder whose decode() method converts the Integer argument to a String representation. As before, the decoded String is added to the outgoing List and forwarded to the next ChannelInboundHandler

public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        // Converts the Integer message to its String representation and adds it to the output Listout.add(String.valueOf(msg)); }}Copy the code
1.4 TooLongFrameException

Since Netty is an asynchronous framework, bytes need to be buffered in memory before they can be decoded. Therefore, you cannot have a decoder that buffers so much data that it runs out of available memory. To address this common concern, Netty provides the TooLongFrameException class, which is thrown by the decoder when a frame exceeds a specified size limit

To avoid this situation, you can set a maximum number of bytes of threshold, if beyond this threshold, will lead to throw a TooLongFrameException (is then ChannelHandler. ExceptionCaught capture () method). How the exception is handled is then entirely up to the user of the decoder. Some protocols (such as HTTP) may allow you to return a special response. In other cases, the only option may be to close the connection

The following example uses TooLongFrameException to notify other channelhandlers in the ChannelPipeline that a frame size overflow has occurred. It is important to note that this protection is especially important if you are using a variable frame size protocol

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    public static final int MAX_FRAME_SIZE = 1024;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        // Check whether the buffer has more than MAX_FRAME_SIZE
        if (readable > MAX_FRAME_SIZE) {
            // Skip all readable bytes, throw TooLongFrameException and notify ChannelHandler
            in.skipBytes(readable);
            throw new TooLongFrameException("Frame too big!");
        }
        //do something}}Copy the code

2. The encoder

The encoder implements the ChannelOutboundHandler and converts outbound data from one format to another, just the opposite of what the decoder does. Netty provides a set of classes to help you write encoders that do the following:

  • Encode the message into bytes
  • Encode a message as a message
2.1 Abstract class MessageToByteEncoder

Earlier we saw how to use ByteToMessageDecoder to convert bytes into messages, now we use MessageToByteEncoder to do the opposite

This class has only one method, whereas the decoder has two. The reason is that decoders usually need to produce the last message after a Channel is closed (hence the decodeLast() method). Obviously this does not apply to the encoder scenario — it makes no sense to generate a message even after the connection is closed

The following code shows ShortToByteEncoder, which accepts an instance of Short as a message, encodes it as the atomic type value of Short, and writes it to ByteBuf. It will then be forwarded to the next ChannelOutboundHandler in the ChannelPipeline. Each outgoing Short will occupy 2 bytes of ByteBuf.

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        // Write Short to ByteBufout.writeShort(msg); }}Copy the code
2.2 Abstract class MessageToMessageEncoder

The Encode () method of the MessageToMessageEncoder class provides the ability to decode inbound data from one message format to another

The following code extends MessageToMessageEncoder using IntegerToStringEncoder. The encoder adds a String representation of each outbound Integer to the List

public class IntegerToStringEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { out.add(String.valueOf(msg)); }}Copy the code

Abstract codec classes

Although we have been talking about decoders and encoders as separate entities, you will sometimes find it useful to manage transformations of inbound and outbound data and messages in the same class. Netty’s abstract codec classes are just right for this purpose, because they will each bundle a decoder/encoder pair to handle the two types of operations we’ve been studying. As you might have guessed, these classes implement both the ChannelInboundHandler and ChannelOutboundHandler interfaces

Why don’t we always use these composite classes in preference to individual decoders and encoders? This is because it maximizes code reusability and extensibility by separating the two functions as much as possible, which is a fundamental tenet of Netty design

1. Abstract class ByteToMessageCodec

Let’s look at a scenario where we need to decode a byte into some form of message, perhaps a POJO, and then encode it again. ByteToMessageCodec will take care of this for us because it combines ByteToMessageDecoder with its reverse, MessageToByteEncoder

Any request/response protocol can be an ideal choice for using ByteToMessageCodec. For example, in an SMTP implementation, the codec will read the incoming bytes and decode them into a custom message type, such as SmtpRequest. On the receiving side, when a response is created, an SmtpResponse is generated, which is encoded back into bytes for transmission

2. Abstract class MessageToMessageCodec

By using MessageToMessageCodec, we can implement the round-trip process of this transformation in a single class. MessageToMessageCodec is a parameterized class defined as follows:

public abstract class MessageToMessageCodec<INBOUND_IN.OUTBOUND_IN>
Copy the code

The decode() method converts messages of type INBOUND_IN to messages of type OUTBOUND_IN, while the encode() method does its reverse. It may be useful to think of messages of type INBOUND_IN as those sent over the network and messages of type OUTBOUND_IN as those processed by the application

The WebSocket protocol

The following example of MessageToMessageCodec references a new WebSocket protocol that enables full two-way communication between the Web browser and the server

Our WebSocketConvertHandler will use a WebSocketFrame of type INBOUND_IN when parameterizing MessageToMessageCodec, And MyWebSocketFrame of type OUTBOUND_IN, which is a statically nested class of the WebSocketConvertHandler itself

public class WebSocketConvertHandler 
        extends MessageToMessageCodec<WebSocketFrame.WebSocketConvertHandler.MyWebSocketFrame> {


    @Override
    protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
        // instantiate a WebSocketFrame of the specified subtype
        ByteBuf payload = msg.getData().duplicate().retain();
        switch (msg.getType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true.0, payload));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(payload));
                break;
            case PING:
                out.add(new PingWebSocketFrame(payload));
                break;
            default:
                throw new IllegalStateException("Unsupported websocket msg "+ msg); }}// Decode WebSocketFrame to MyWebSocketFrame and set FrameType
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf paload = msg.content().duplicate().retain();
        if (msg instanceof  BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, paload));
        } else
        if (msg instanceof  CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, paload));
        } else
        if (msg instanceof  PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, paload));
        } else
        if (msg instanceof  PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, paload));
        } else
        if (msg instanceof  TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, paload));
        } else
        if (msg instanceof  ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, paload));
        } else {
            throw new IllegalStateException("Unsupported websocket msg "+ msg); }}public static final class MyWebSocketFrame {
        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }
        private final FrameType type;
        private final ByteBuf data;

        public MyWebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }

        public FrameType getType(a) {
            return type;
        }

        public ByteBuf getData(a) {
            returndata; }}}Copy the code

3. CombinedChannelDuplexHandler class

As we mentioned earlier, combining a decoder and encoder can have an impact on reusability. However, there is a way to avoid this penalty without sacrificing the convenience of deploying a decoder and an encoder as a separate unit. CombinedChannelDuplexHandler provides the solution, the following statement:

public class CombinedChannelDuplexHandler
	<I extends ChannelInboundHandler.O extends ChannelOutboundHandler>
Copy the code

This class acts as a container for ChannelInboundHandler and ChannelOutboundHandler (the class’s type parameters I and O). By providing a type that inherits the decoder class and the encoder class separately, we can implement a codec without directly extending the abstract codec class

First, let’s look at the code that extends ByteToMessageDecoder because it reads characters from ByteBuf

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 2) { out.add(in.readChar()); }}}Copy the code

The decode() method here extracts 2 bytes at a time from ByteBuf and writes them to the List as chars, which are automatically boxed as Character objects

The following code converts Character back to bytes. This class extends MessageToByteEncoder because it needs to encode char messages into ByteBuf. This is done by writing ByteBuf directly

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception { out.writeChar(msg); }}Copy the code

Now that we have a decoder and encoder, we can combine them to build a codec

// Implement parametric CombinedByteCharCodec through the decoder and encoder
public class CombinedChannelDuplexHandler extends
        io.netty.channel.CombinedChannelDuplexHandler<ByteToCharDecoder.CharToByteEncoder> {
    public CombinedChannelDuplexHandler(a) {
        // Pass the delegate instance to the parent class
        super(new ByteToCharDecoder(), newCharToByteEncoder()); }}Copy the code