preface

This chapter learns Netty’s stack event handling and decoding.

  • ACCEPT push event: creates a Channel and registers it with the Selector

  • READ Push event: Reads data

  • Implement custom protocols: Custom codecs

  • ByteToMessageDecoder: How to handle TCP sticky packet unpack

  • How to handle decoded exceptions: How does Dubbo handle decoded exceptions

A review,

1, the ChannelPipeline

A ChannelHandler encapsulates a ChannelHandlerContext into the ChannelPipeline.

A ChannelPipeline assembles a two-way linked list of ChannelHandlerContext, with HeadContext and TailContext as its head node.

Out-of-stack events are propagated from TailContext to HeadContext, and in-stack events are propagated from HeadContext to TailContext.

2, the stack

An out-of-stack event is propagated, for example: the read out-of-stack event triggered by a server registering a Channel (set the concern to ACCEPT).

The out-of-stack event is propagated ChannelOutboundInvoker->ChannelOutboundHandler->Unsafe.

The key classes involved are as follows: Channel (ChannelOutboundInvoker) ->Pipeline (ChannelOutboundInvoker) ->TailContext (ChannelOutboundInvoker) ->ChannelOutboundH Andlers – > HeadContext (ChannelOutboundHandler) – > the Unsafe.

Second, the ACCEPT

Recall that the processSelectedKey method in NioEventLoop handles events that are activated on SelectionKey. If a READ or ACCEPT event occurs, the Channel’s corresponding Unsafe implementation class is called to handle it.

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // ...
    try {
        int readyOps = k.readyOps();
        // ...
        // The READ or ACCEPT event, which calls the READ method unsafe
        if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

For server NioServerSocketChannel, the ACCEPT event is concerned, and the corresponding Unsafe implementation class NioMessageUnsafe. Abstract superclass AbstractNioMessageChannel. Method to realize the read NioMessageUnsafe (pay attention to the difference in the stack read event, stack the read is to set focus on events to read or ACCEPT).

private final class NioMessageUnsafe extends AbstractNioUnsafe {
		/ / save NioServerSocketChannel
    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read(a) {
        assert eventLoop(a).inEventLoop(a);
        // Broadening, as an inner class for a Channel, allows you to access many things in a Channel
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            // 1. Receive JDK SocketChannel, encapsulate it as Netty NioSocketChannel and put it into readBuf
            try {
                do {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
            // 2. Trigger the channelRead event
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            // 3. Trigger the push event channelReadComplete
            pipeline.fireChannelReadComplete();
            // ...
        } finally {
            // ...}}}Copy the code

The server accepts events in three steps:

  • DoReadMessages: receive JDK SocketChannel, encapsulate it as Netty NioSocketChannel and put it into readBuf.
  • FireChannelRead: Triggers the push event channelRead, which is important.
  • FireChannelReadComplete: When all connections are processed, the push event channelReadComplete is triggered.

NioServerSocketChannel Receives SocketChannel and encapsulates it as NioSocketChannel into the result set.

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
  	if(ch ! =null) {
    	buf.add(new NioSocketChannel(this, ch));
    	return 1;
  	}
    return 0;
}
Copy the code

Unsafe then propagates the channelRead push event through the Pipeline. Pipeline propagates ChannelRead events from HeadContext to TailContext. Note that HeadContext means that ChannelInboundHandler can handle the pushed event, and that ChannelInboundInvoker can propagate the pushed event backwards through the fire method.

// DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    // Notice that the static method is passed in an instance of headContext
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}
// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  EventExecutor executor = next.executor();
  if (executor.inEventLoop()) {
    next.invokeChannelRead(m);
  } else{}}/ / AbstractChannelHandlerContext (HeadContext) to obtain the corresponding Handler is HeadContext
private void invokeChannelRead(Object msg) {
  // The handler gets the HeadContext itself
  ((ChannelInboundHandler) handler()).channelRead(this, msg);
}
// HeadContext as ChannelInboundHandler
// Also ChannelInboundInvoker
// Propagate channelRead to the next node
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
  ctx.fireChannelRead(msg);
}
Copy the code

Here on the Server side NioServerSocketChannel during initialization, through ChannelInitializer joined ServerBootstrap. This special InboundHandler ServerBootstrapAcceptor. This Netty provided Handler registers the client NioSocketChannel with the child EventLoopGroup. All childXXX here is derived from our configuration (recall ServerBootStrap in Chapter 9 and Server side Channel initiation in Chapter 10). In addition, the channelRead backward propagation is terminated because the ctx.fireChannelRead method is not called to continue the backward propagation.

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private finalEntry<ChannelOption<? >, Object>[] childOptions;private finalEntry<AttributeKey<? >, Object>[] childAttrs;private final Runnable enableAutoReadTask;

    ServerBootstrapAcceptor(
            finalChannel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<? >, Object>[] childOptions, Entry<AttributeKey<? >, Object>[] childAttrs) {this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    }

    // The server receives the client Channel
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg; // A strong dash, because doReadMessages puts Channel instances
        child.pipeline().addLast(childHandler); // The ChannelInitializer is usually configured
        setChannelOptions(child, childOptions, logger);
        setAttributes(child, childAttrs);
        childGroup.register(child).addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }}Copy the code

The childGroup.register method is the same as in Chapter 10. Select an EventLoop from the EventLoopGroup and bind it to a Channel.

Third, the READ

For NIo SocketChannels, focus on the READ event. NioByteUnsafe’s READ method is called when a READ event occurs on a channel indicating bytes are readable. (Note that this differs from the pushed READ event, which sets the attention to READ or ACCEPT.)

// Read the bytes from the peer end
@Override
public final void read(a) {
  final ChannelConfig config = config();
  final ChannelPipeline pipeline = pipeline();
  final ByteBufAllocator allocator = config.getAllocator();
  final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  allocHandle.reset(config);
  ByteBuf byteBuf = null;
  do {
    // Handle creates a ByteBuf of appropriate capacity to hold the data in the Channel. The capacity may change each time
    byteBuf = allocHandle.allocate(allocator);
    // Read bytes from the underlying JDKChannel to byteBuf and record the number of bytes read
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    if (allocHandle.lastBytesRead() <= 0) {
      byteBuf.release();
      byteBuf = null;
      break;
    }
    // The total number of cycles
    allocHandle.incMessagesRead(1);
    readPending = false;
    / / triggers channelRead
    pipeline.fireChannelRead(byteBuf);
    byteBuf = null;
    // Determine whether reading can continue
  } while (allocHandle.continueReading());
  allocHandle.readComplete();
  / / triggers ChannelReadComplete
  pipeline.fireChannelReadComplete();
  // ...
}
Copy the code

The NioSocketChannel#doReadBytes method first reads the data into ByteBuf.

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
Copy the code

ChannelRead is then pushed, as is the ACCEPT event, except that ByteBuf is passed instead of NioSocketChannel. Typically, channelRead events pass through a decoder to business handlers, which are typically executed in the business thread pool to prevent blocking IO threads (EventLoop in WorkerGroup).

In addition, when the read method loops through bytes sent from the peer end, it controls the number of loops through allocHandle to ensure that too much data is not processed for one Channel, preventing one client Channel from blocking requests from another.

. Main processing logic in DefaultMaxMessagesRecvByteBufAllocator MaxMessageHandle abstract class, its implementation class is AdaptiveRecvByteBufAllocator HandleImpl.

// DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle
// Determine if the read method can continue the loop
/ / into the = defaultMaybeMoreSupplier refs
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            // Default true, whether to automatically set attention to read events
    return config.isAutoRead() &&
           // respectMaybeMoreData = true
           // maybeMoreDataSupplier determines whether there maybe more data to read based on the current loop(! respectMaybeMoreData || maybeMoreDataSupplier.get()) &&// The number of loops is < 16
           totalMessages < maxMessagePerRead &&
      		 // Total read bytes > 0
           totalBytesRead > 0; 
}

// Determine if there might be more bytes to read
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
  @Override
  public boolean get(a) {
    // Whether the bytes attempted to read are equal to the bytes actually read from the channel
    // If equals, there may be new bytes that need to be read from channel
    returnattemptedBytesRead == lastBytesRead; }};Copy the code

AdaptiveRecvByteBufAllocator. HandleImpl will according to the actual number of bytes to read. Dynamic adjustment index, control the next time the allocate method to create ByteBuf capacity. The specific logic is not expanded, mainly know how much Channel byte data is read each time, Netty does dynamic control.

  • The lastBytesRead method is triggered when the underlying Channel reads
  • ContinueReading determines that the loop is broken and the readComplete method is triggered
  • Breaks out of the loop when the channel has no data to read, triggering the readComplete method
private final class HandleImpl extends MaxMessageHandle {
    private final int minIndex;
    private final int maxIndex;
    private int index;
    private int nextReceiveBufferSize;
    private boolean decreaseNow;

    HandleImpl(int minIndex, int maxIndex, int initial) {
        this.minIndex = minIndex;
        this.maxIndex = maxIndex;

        index = getSizeTableIndex(initial);
        nextReceiveBufferSize = SIZE_TABLE[index];
    }
    // Guess how much memory is needed to hold bytes in a Channel
  	@Override
    public int guess(a) {
        return nextReceiveBufferSize;
    }
		// The number of bytes read by Channel
    @Override
    public void lastBytesRead(int bytes) {
        if (bytes == attemptedBytesRead()) {
            record(bytes);
        }
        super.lastBytesRead(bytes);
    }
  
    // This channel read loop exits the call
    @Override
    public void readComplete(a) {
        record(totalBytesRead());
    }

    // Dynamically adjust the metric based on the actual number of bytes read, and control the GUESS method to return the capacity of the next ByteBuf creation
    private void record(int actualReadBytes) {
        if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
            if (decreaseNow) {
                index = max(index - INDEX_DECREMENT, minIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            } else {
                decreaseNow = true; }}else if (actualReadBytes >= nextReceiveBufferSize) {
            index = min(index + INDEX_INCREMENT, maxIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false; }}}Copy the code

How to solve the problem of sticking and unpacking

READ push events often require a decoding Handler to convert ByteBuf into business messages. This chapter looks at how to use Netty’s decoder framework and how user code can work with Netty’s decoder framework to solve sticky unpacking problems.

1, how to use ByteToMessageDecoder to achieve their decoder

The public class

First, define a protocol. The entire datagram is divided into header and body parts.XHeaderThe definition is as follows.

Two types of packets are defined: XRequest is a request packet and XResponse is a response packet.

XDecoderInheritance of NettyByteToMessageDecoderIs an InboundChannelHandler that handles custom protocol decoding.

public class XDecoder extends ByteToMessageDecoder {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 1. If the bytes are less than the size of the header, return directly
        if (in.readableBytes() < XHeader.HEADER_SIZE) {
            System.out.println("receive part header, header size = " + in.readableBytes());
            return;
        }
        // 2. Header parsing
        int readerIndex = in.readerIndex(); // Save the read index, which can be restored later
        if(in.readShort() ! = XHeader.MAGIC) { System.out.println("MAGIC ERROR, close channel "  + ctx.channel().remoteAddress());
            ctx.channel().close();
            return;
// throw new IllegalArgumentException("MAGIC ERROR");
        }
        if(in.readByte() ! = XHeader.VERSION_1) { System.out.println("VERSION ERROR, close channel "  + ctx.channel().remoteAddress());
            ctx.channel().close();
            return;
// throw new IllegalArgumentException("VERSION ERROR");
        }
        byte type = in.readByte();
        int length = in.readInt();
        // 3. The number of remaining readable bytes is less than the length identifier in the header
        if (in.readableBytes() < length) {
            System.out.println("receive part data, length in header = " + length + ", data size = " + in.readableBytes());
            in.readerIndex(readerIndex);
            return;
        }
        // 4
        byte[] data = new byte[length];
        in.readBytes(data);
        if (type == XHeader.REQUEST) {
            XRequest xRequest = objectMapper.readValue(data, XRequest.class);
            out.add(xRequest);
        } else if (type == XHeader.RESPONSE) {
            XResponse xResponse = objectMapper.readValue(data, XResponse.class);
            out.add(xResponse);
        }
        // Unknown type will continue the next packet parsing}}Copy the code

The service side

XServer is the server, Pipeline joined the universal decoder, Response encoder, Request business processor.

public class XServer {
    public static void main(String[] args) throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
        try {
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast("decoder".new XDecoder()) / / decoder
                                    .addLast("encoder".new XResponseEncoder()) / / encoder
                                    .addLast("xreq".new XRequestHandler()); // Business processor}}); ChannelFuture future = bootstrap.bind(9999).sync();
            System.out.println("server start at port 9999");
            future.channel().closeFuture().sync();
        } finally{ boss.shutdownGracefully(); worker.shutdownGracefully(); }}}// Xresponse encoder
public class XResponseEncoder extends MessageToByteEncoder<XResponse> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XResponse msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.RESPONSE);
        byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}// The business handler handles the client request and returns the client's data in uppercase
public class XRequestHandler extends SimpleChannelInboundHandler<XRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, XRequest msg) throws Exception {
        XResponse response = new XResponse();
        response.code = "0";
        response.msg = "success";
        response.data = msg.data.toUpperCase();
        System.out.println(newObjectMapper().writeValueAsString(msg)); ctx.writeAndFlush(response); }}Copy the code

The client

XClient is the client, Pipeline joined the universal decoder, Request encoder, Response business processor

public class XClient {
    private static final int PORT = 9999;
    public static void main(String[] args) throws InterruptedException {
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast("decoder".new XDecoder()) / / decoder
                                    .addLast("encoder".new XRequestEncoder()) // Request an encoder
                                    .addLast("xres".new XResponseHandler()); // The response handler}}); ChannelFuture future = bootstrap.connect("127.0.0.1", PORT).sync();
            assert future.isSuccess();
            System.out.println("connect success");
            // 1. Invoke the server normally
            writeSuccess(future.channel());
        } finally{ group.shutdownGracefully(); }}// Loop to server {"data": "hello"}
    private static void writeSuccess(Channel channel) throws InterruptedException {
        XRequest request = new XRequest();
        request.data = "hello";
        while (channel.isActive()) {
            channel.writeAndFlush(request);
            Thread.sleep(1000); }}}// XRequest encoder
public class XRequestEncoder extends MessageToByteEncoder<XRequest> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext ctx, XRequest msg, ByteBuf out) throws Exception {
        out.writeShort(XHeader.MAGIC);
        out.writeByte(XHeader.VERSION_1);
        out.writeByte(XHeader.REQUEST);
        byte[] bytes = objectMapper.writeValueAsBytes(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }}// XResponse business processor
public class XResponseHandler extends SimpleChannelInboundHandler<XResponse> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, XResponse msg) throws Exception {
        System.out.println(newObjectMapper().writeValueAsString(msg)); }}Copy the code

2, ByteToMessageDecoder

ByteToMessageDecoder is Netty provides to use the abstract decoder implementation class, the user as long as the abstract decode method can be decoded. The advantage of ByteToMessageDecoder is to help users solve the PROBLEM of TCP sticky packet unpacking, but users need to carefully implement decode method.

Looking at ByteToMessageDecoder’s member variables, you can see that it maintains a cumulative ByteBuf that is used to accumulate bytes for the same Channel.

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    / / cumulative ByteBuf
    ByteBuf cumulation;
    / / accumulator
    private Cumulator cumulator = MERGE_CUMULATOR;
    // Whether ChannelRead processes ByteBuf only once per session (calling the user's decode method once) the default is false
    private boolean singleDecode;
    // is the first summation
    private boolean first;
}
Copy the code

When the READ event is triggered, NioSocketChannel reads bytes from JDKChannel into a ByteBuf that is the MSG entry to the ByteToMessageDecoder. On the whole, ByteToMessageDecoder’s channelRead method is divided into several steps:

  • Cumulator Accumulates MSG to cumulation
  • Call the user’s decode method
  • Finally resource release, propagating decode completed data backwards through channelRead
// ByteToMessageDecoder
/ / cumulative ByteBuf
ByteBuf cumulation;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // think of it as an ArrayList, using object pooling and FastThreadLocal
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            // Accumulate the readable bytes in MSG into cumulation, where MSG is released
            first = cumulation == null;
            cumulation = cumulator.cumulate(ctx.alloc(),
                    first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
            // Call the user's decode method...
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if(cumulation ! =null && !cumulation.isReadable()) {
              // If cumulation has been read, release and set to null
              numReads = 0;
              cumulation.release();
              cumulation = null;
            } else if (++numReads >= discardAfterReads) {
              // Prevent cumulation from excessive expansion
              // After channelRead has processed 16 times, read bytes in the accumulator are discarded
              numReads = 0;
              discardSomeReadBytes();
            }
            // Forward the packets in the decoded out list
            intsize = out.size(); firedChannelRead |= out.insertSinceRecycled(); fireChannelRead(ctx, out, size); }}else{ ctx.fireChannelRead(msg); }}Copy the code

Cumulator

/**
 * Cumulate {@link ByteBuf}s.
 */
public interface Cumulator {
    /**
     * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
     * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
     * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
     */
    ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
Copy the code

Cumulator is a key interface for sticky packet unpacking. Cumulator is responsible for adding input BUF to a cumulative BUF. According to Javadoc, if the buf input is finished reading, the resource should be released here.

ByteToMessageDecoder provides two implementations, one based on memory copy and one based on CompositeByteBuf (one of the Netty zero copy).

MERGE_CUMULATOR is the default accumulator implementation, which accumulates IN into cumulation based on memory copy.

/**
 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
 */
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        // Buf has no data to read and in is a non-composite bytebuf
        if(! cumulation.isReadable() && in.isContiguous()) { cumulation.release();return in;
        }
        try {
            final int required = in.readableBytes();
            // If the accumulated buF space is insufficient, perform expansion
            if (required > cumulation.maxWritableBytes() ||
                    (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                    cumulation.isReadOnly()) {
                return expandCumulation(alloc, cumulation, in);
            }
            // Write all the readable data in to buf
            cumulation.writeBytes(in, in.readerIndex(), required);
            in.readerIndex(in.writerIndex());
            return cumulation;
        } finally {
            // in can be released after writing to the accumulator
            / / (io.net ty. Channel. Nio. AbstractNioByteChannel. NioByteUnsafe. Read to create ByteBuf)in.release(); }}};Copy the code

COMPOSITE_CUMULATOR is a zero-copy accumulator based on CompositeByteBuf. According to Javadoc, the disadvantages of the COMPOSITE_CUMULATOR are the complexity of user implementation of decode logic and the slow Decoder speed.

/**
 * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
 * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
 * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
 */
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        if(! cumulation.isReadable()) { cumulation.release();return in;
        }
        CompositeByteBuf composite = null;
        try {
            if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
                composite = (CompositeByteBuf) cumulation;
                if (composite.writerIndex() != composite.capacity()) {
                    composite.capacity(composite.writerIndex());
                }
            } else {
                composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
            }
            composite.addFlattenedComponents(true, in);
            in = null;
            return composite;
        } finally {
            if(in ! =null) {
                in.release();
                if(composite ! =null&& composite ! = cumulation) { composite.release(); }}}}};Copy the code

decode

The callDecode method is responsible for cyclic execution of the user’s decode method. Note that the input parameter in is the accumulated buF passed in from the outside, and the out set stores the user’s decoded data.

/**
 * Called once data should be decoded from the given {@link ByteBuf}. This method will call
 * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
 *
 * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
 * @param in            the {@link ByteBuf} from which to read data
 * @param out           the {@link List} to which decoded messages should be added
 */
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            int outSize = out.size();
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();
                // ...
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            // Execute the user's decode method
            / / this source is decodeRemovalReentryProtection method
            // Simplify the presentation as decode does not affect the main logic
            decode(ctx, in, out);
            if (ctx.isRemoved()) {
                break;
            }
            // Out list has no new element
            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    // The number of readable bytes does not change, interrupt the decoding
                    break;
                } else {
                    // The number of readable bytes changes, continue decoding
                    continue; }}// Out list has new elements, but the number of bytes byteBuf can read has not changed (writeIndex - readIndex)
            if (oldInputLength == in.readableBytes()) {
                throw newDecoderException(...) ; }// New elements are added to the out list, and the number of bytes readable by byteBuf is changed, so continue the loop

            // Whether to end the loop after reading only once (default false)
            if (isSingleDecode()) {
                break; }}}catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw newDecoderException(cause); }}Copy the code

CallDecode loop control conditions depend on the convention between the framework and the user, and whether the loop is interrupted depends on how the user decode accumulates BUF (in) and decodes result set (OUT).

Based on the above analysis, how does the user’s decode method solve the sticky unpacking problem?

To solve the sticky packet problem, it is necessary to keep the loop circulating to ensure that the readable bytes of the accumulated Buffer change and the size of the result set changes, that is, a packet decoded from the accumulated Buffer is put into the Out result set. Of course, it is also possible to solve sticky packet problems by loops inside the decode method.

To solve the problem of unpacking, it is necessary to let the loop exit, ensure that the readable bytes remain unchanged and the size of the result set remains unchanged, and let the loop break directly. That is, if the user decode cannot read the complete packet according to the user-defined protocol, the buffer read subscript will be restored and returned directly.

For example, XDecoder custom protocol decoding implementation.

Unpacking problem: When the read bytes are insufficient, ensure that the read index does not change, do not operate the result set OUT directly returned, the external decoding cycle will be interrupted.

Sticky packet problem: Decode only processes one packet at a time, allowing the Netty framework external loop to call the decode method.

public class XDecoder extends ByteToMessageDecoder {
    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 1. If the bytes are less than the size of the header, return directly
        if (in.readableBytes() < XHeader.HEADER_SIZE) {
            return;
        }
        // 2. Header parsing
        int readerIndex = in.readerIndex(); // Save the read index, which can be restored later
        if(in.readShort() ! = XHeader.MAGIC) {// ...
          // Throw an exception? Close the channel? Restore read index return?
        }
        if(in.readByte() ! = XHeader.VERSION_1) {// ...
        }
        byte type = in.readByte();
        int length = in.readInt();
        // 3. The number of remaining readable bytes is less than the length identifier in the header
        if (in.readableBytes() < length) {
            in.readerIndex(readerIndex);
            return;
        }
        // 4
        // ...}}Copy the code

5. How to deal with decoding exceptions

User-defined protocol decoding. What should I do if the datagram sent from the peer end fails to pass the verification of the user-defined protocol?

int readerIndex = in.readerIndex();
if(in.readShort() ! = XHeader.MAGIC) {// ...
  // Throw an exception? Close the channel? Restore read index return?
}
Copy the code

Scheme 1: Does the read index return?

Restore read index returns, do not operate out result set, and the invalid packets will always be stored in the accumulated buffer. Memory overflow?

Scheme 2: Throw an exception?

If the read index is not recovered, the read data is skipped and the decoding loop exits, waiting for the next channelRead event. But the next time you read it, you might be reading abnormal data?

Plan three: Close the channel?

There seems to be no risk, since the client datagrams are illegal. Is the service allowed?

The client connects to port 20880 using XClient and sends ByteBuf directly. Dubo-2.7.9 demo is used on the server.

package org.apache.dubbo.demo.provider;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Application {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-provider.xml"); context.start(); System.in.read(); }}Copy the code

Dubbo coding rules are as follows:

Scenario 1: Dubbo sends a packet that fails the magic check

The clients are as follows.

private static void writeDubboError(Channel channel) throws InterruptedException {
    final long start = System.currentTimeMillis();
    channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
        @Override
        public void operationComplete(Future<? super Void> future) throws Exception {
            System.out.println("Dubbo channel shutdown timeout:" + (System.currentTimeMillis() - start) / 1000); }});// Loop 0 to Dubbo
    while (channel.isActive()) {
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer().writeBytes(new byte[1]);
        channel.writeAndFlush(byteBuf);
        Thread.sleep(100); }}Copy the code

Looking at the test results, the cumulative Buffer on the Dubbo server continues to grow.

The channel was closed after the client continued to send 180s of data.

First Dubbo checks the magic number for the first two bytes.

// ExchangeCodec
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // Check magic number 0xdabb, if magic number check does not pass the parent class decode
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        // Read all readable bytes into the header
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        // Loop until magic_high and magic_low are found
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break; }}// Call the parent decode
        return super.decode(channel, buffer, readable, header); }}Copy the code

If the magic check does not pass, the parent TelnetCodec class to determine whether to decode, TelnetCodec determines that no carriage return, return decoderesult. NEED_MORE_INPUT.

// TelnetCodec
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException {
    / /... Omit the other
    byte[] enter = null;
    for (Object command : ENTER) {
        if (endsWith(message, (byte[]) command)) {
            enter = (byte[]) command;
            break; }}if (enter == null) {
        returnDecodeResult.NEED_MORE_INPUT; }}Copy the code

Finally, the most external ByteToMessageDecoder implementation class InternalDecoder chooses to reset the read index of the accumulated Buffer and wait for the next channelRead. This is why the accumulated Buffer continues to grow. At the same time see Dubbo is in the user decode decode cycle implementation, does not rely on ByteToMessageDecoder cycle decode.

private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
			  // Encapsulate the input to Dubbo's own ChannelBuffer
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        / / encapsulation NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

        do {
            int saveReaderIndex = message.readerIndex();
            Object msg = codec.decode(channel, message);
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                // Reset the read index and exit
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                // Add the result set
                if(msg ! =null) { out.add(msg); }}}while(message.readable()); }}Copy the code

In addition, the channel connection was disconnected after the client sent 180s invalid request packets. The reason is that Dubbo uses Netty’s IdleStateHandler, which by default disconnects when the client connection is idle for more than 180s (note the Handler order, the idle Handler is detected before the decoder).

The NettyServer#doOpen method starts the Netty server.

protected void doOpen(a) throws Throwable {
    bootstrap.group(bossGroup, workerGroup)
            // ...
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                     // Idle connection timeout duration. The default value is 180s
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                     // codec
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        ch.pipeline().addLast("negotiation",
                                SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                    }
                    ch.pipeline()
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                             // 180s Sends IdleStateEvent if there is no read or write
                            .addLast("server-idle-handler".new IdleStateHandler(0.0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler); }});/ /...
}
Copy the code

NettyServerHandler handles IdleStateEvent and closes Channel.

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        channel.close();
        // ...}}Copy the code

Scenario 2: Dubbo sends a non-existent serialization method

According to the Dubbo decoding rules, the non-existent serialization mode (the lower 5 bits of the third byte) 0 is sent to the Dubbo server.

 private static void writeDubboError2(Channel channel) throws InterruptedException {
   final long start = System.currentTimeMillis();
   channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
     @Override
     public void operationComplete(Future<? super Void> future) throws Exception {
       System.out.println("Dubbo channel shutdown timeout:" + (System.currentTimeMillis() - start) / 1000); }}); String body ="hello";
   while (channel.isActive()) {
     ByteBuf buf = ByteBufAllocator.DEFAULT.buffer()
       .writeShort(MAGIC) // 2 bytes magic number
       .writeByte(0) // 1 byte serialization method/event type/request type /twoway...
       .writeByte(1) // 1 byte status
       .writeLong(1L) // 8 bytes id
       .writeInt(body.length()) // The body length is 4 bytes
       .writeBytes(body.getBytes()); // body
     channel.writeAndFlush(buf);
     Thread.sleep(1000); }}Copy the code

Dubbo server exception log, 5 bytes skipped:

[10/04/21 11:35:36:480 CST] NettyServerWorker-5-1 WARN dubbo.DubboCodec: [DUBBO] Decode response failed: Unrecognized serialize Type from Consumer: 0, Dubbo version:, current host: 192.168.0.104 Java.io.IOException: Unrecognized serialize type from consumer: 0 at org.apache.dubbo.remoting.transport.CodecSupport.getSerialization(CodecSupport.java:89) at org.apache.dubbo.remoting.transport.CodecSupport.deserialize(CodecSupport.java:95) at org.apache.dubbo.rpc.protocol.dubbo.DubboCodec.decodeBody(DubboCodec.java:108) at org.apache.dubbo.remoting.exchange.codec.ExchangeCodec.decode(ExchangeCodec.java:132) at org.apache.dubbo.remoting.exchange.codec.ExchangeCodec.decode(ExchangeCodec.java:90) at org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec.decode(DubboCountCodec.java:48) at [10/04/21 11:35:36:481 CST] NettyServerWorker-5-1 WARN codec.ExchangeCodec: [DUBBO] Skip input stream 5, dubbo version: , current host: 192.168.0.104Copy the code

Dubbo encapsulates the Buffer as an InputStream parse packet, which is caught inside the decoded exception and skips the entire datagram in the outermost decode method’s finally block.

// ExchangeCodec
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // Check magic number 0xdabb, if magic number check does not pass the parent class decode
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
      // ...
    }
    // If the number of bytes is less than 16, the packet is returned
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // The length of the body is insufficient
    int len = Bytes.bytes2int(header, 12);
    int tt = len + HEADER_LENGTH;
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }
    // Buffer is converted to InputStream. The underlying buffer is still read and write, with len bytes readable
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    try {
        return decodeBody(channel, is, header);
    } finally {
        // If there are still readable bytes after decodeBody, skip all of them
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch(IOException e) { logger.warn(e.getMessage(), e); }}}}// DubboCodec
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
  byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
  // get request id.
  long id = Bytes.bytes2long(header, 4);
  if ((flag & FLAG_REQUEST) == 0) {
    // decode response.
    Response res = new Response(id);
    if((flag & FLAG_EVENT) ! =0) {
      res.setEvent(true);
    }
    // get status.
    byte status = header[3];
    res.setStatus(status);
    try {
      if (status == Response.OK) {
        // ...
      } else {
        // An exception occurs when the buffer is deserialized according to protoObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); res.setErrorMessage(in.readUTF()); }}catch (Throwable t) {
      if (log.isWarnEnabled()) {
        log.warn("Decode response failed: " + t.getMessage(), t);
      }
      res.setStatus(Response.CLIENT_ERROR);
      res.setErrorMessage(StringUtils.toString(t));
    }
    return res;
  } else {
    // decode request.}}Copy the code

This scenario differs from the previous one in that an abnormal business Response object is returned and eventually added to the result set, so no idle connection detection is triggered and no channel is closed.

// DubboCountCodec
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();
    MultiMessage result = MultiMessage.create();
    do {
        // Return the parsed object as normal
        Object obj = codec.decode(channel, buffer);
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            buffer.readerIndex(save);
            break;
        } else{ result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); }}while (true);
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}
Copy the code

conclusion

  • ACCEPT: The server NioServerSocketChannel concerns the ACCEPT event. When an ACCEPT event is triggered on a channel, the NioMessageUnsafe method read is called to receive the NioSocketChannel. Although an Accept event is handled on SelectionKey, Unsafe fires channelRead via Pipeline. This stack of Netty ServerBootstrap. ServerBootstrapAcceptor processor channelRead method, the client NioSocketChannel register and initialized.
  • READ push event: NioSocketChannel looks at READ events. When a READ event is triggered on a Channel, NioByteUnsafe is called to READ ByteBuf from the Channel, and Pipeline is called to trigger channelRead. ChannelRead typically goes through a decoding Handler and a business Handler.
  • Implement custom protocol: inherit ByteToMessageDecoder to achieve decoding, inherit MessageToByteEncoder to achieve coding.
  • ByteToMessageDecoder: Abstract decoding class provided by Netty. The decode method requires the user to cooperate with the Netty framework. The input parameter ByteBuf is a continuously accumulating buffer, and the input parameter OUT collection is used to store successfully decoded business objects. In addition, decode method can solve sticky packet unpacking problem by controlling accumulated buffer and out set.
  • Handling of Dubbo decoding exceptions:
    • If the magic check fails, the client’s data continues to be accumulated into the accumulated Buffer until 180s triggers idle connection detection and the channel is closed.
    • Serialization does not exist, skip the entire datagram and place the abnormal business object in the decoded result set.