ObjectDecoder and ObjectEncoder codecs
1. ObjectEncoder
An encoder which serializes a Java object into a {@link ByteBuf}.
Serialize a Java object into a ByteBuf object, noting that the Java object needs to implement the Java.io.Serializable interface
2. ObjectDecoder
A decoder which deserializes the received {@link ByteBuf}s into Java objects.
To de-sequence a ByteBuf into a Java object, note that the Java object needs to implement the java.io.Serializable interface
3. Start fast
The first step is that a Java object must implement the Serializable interface
public class OPack implements Serializable {
private static final long serialVersionUID = -5734509523963527363L; String name; String msg; . Other ellipsis, get, set methods, etc.}Copy the code
Both encoders and decoders are added to the client server
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(1004.new ClassResolver() {
@Override
publicClass<? > resolve(String className)throws ClassNotFoundException {
returnOPack.class; }}));Copy the code
Himself wrote a ChannelInboundHandlerAdapter implementation channelRead () method, is very simple, or SimpleChannelInboundHandler < T > channelRead0 () method,
4. Codec source code analysis
1. ObjectEncoder coder
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
// This is a byte array of ints, because ints are four bytes
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
/ / write pointer
int startIdx = out.writerIndex();
//
ByteBufOutputStream bout = new ByteBufOutputStream(out);
//
ObjectOutputStream oout = null;
try {
// First write a 4-byte array
bout.write(LENGTH_PLACEHOLDER);
oout = new CompactObjectOutputStream(bout);
// Then write an object
oout.writeObject(msg);
oout.flush();
} finally{... }// Get the write pointer
int endIdx = out.writerIndex();
// Set the length value
out.setInt(startIdx, endIdx - startIdx - 4); }}Copy the code
Play by yourself……..
ByteBuf out = Unpooled.buffer(1024);
// the w pointer position
int start = out.writerIndex();
// Set an output stream
ByteBufOutputStream bbos = new ByteBufOutputStream(out);
// First write an array of 4 bytes
bbos.write(new byte[4]);
/ / object flow
ObjectOutputStream objectOutputStream = new ObjectOutputStream(bbos);
// Write an obj
objectOutputStream.writeObject(new OPack("hhh"."hhh"));
// W final position
int end = out.writerIndex();
// Set an int at the start position of the buf object to end-start-4, so the object length is calculated
out.setInt(start, end - start - 4);
System.out.println("out = " + out);
System.out.println("out.readInt() = " + out.readInt());
// look at counting references
System.out.println("out.refCnt() = " + out.refCnt());
Copy the code
The output
out = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 117, cap: 1024)
out.readInt() = 113
out.refCnt() = 1
Copy the code
2. ObjectDecoder
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
private final ClassResolver classResolver;
public ObjectDecoder(ClassResolver classResolver) {
this(1048576, classResolver);
}
// This means the length of each frame is 1048576
// The offset of the length is 0, occupying 4 bytes, and finally removing the first 4 bytes
public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
super(maxObjectSize, 0.4.0.4);
this.classResolver = classResolver;
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// Call the parent's decode() method directly and return the ByteBuf method that has already deducted our 4 bytes
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
// This code has two parts: one is to release ByteBuf and one is to convert it to an ObjectInputStream object
ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
try {
// Just read it last
return ois.readObject();
} finally{ ois.close(); }}}Copy the code
I have here a picture, you can clearly see the previous change, change point is what I mentioned above, I’ll explain below LengthFieldBasedFrameDecoder source of its analysis, also note is that you get the count of reference at this time is 2, the need to manually release, I already mentioned…. in the comments above Netty has released it for us, but I debug found that it was not released, but there was a release behind it.
StringEncoder and StringDecoder – string codec
MessageToMessageEncoder and MessageToMessageDecoder
StringDecoder converts ByteBuf toString, calling bytebuf.tostring (charset),
StringEncoder converts String to ByteBuf, calling bytebufutil.encodeString (ctx.alloc(), charbuffer.wrap (MSG), charset)
Server side:
final StringServerHandler stringServerHandler = new StringServerHandler();
// Add a handler to the pipe
ChannelPipeline pipeline = ch.pipeline();
// Add string codec,
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// self-implemented
pipeline.addLast("serverHandler", stringServerHandler);
Copy the code
Our processor StringServerHandler implements SimpleChannelInboundHandler < String >.
private static class StringServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
logger.info("[server] received message: {}", msg);
ctx.writeAndFlush("Server received message"); }}Copy the code
Test case: Found OK
Io.net ty. Handler. Codec. String. StringEncoder basic implementation code
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
Copy the code
Io.net ty. Handler. Codec. String. StringDecoder basic implementation
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
Copy the code
3. FixedLengthFrameDecoder – fixed length decoder
1. The source code
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
/ / fixed length
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
// ByteToMessageDecoder
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// Execute this method first -- >
Object decoded = decode(ctx, in);
// null is thrown directly into the buffer
if(decoded ! =null) {
// add it if it is not emptyout.add(decoded); }}// -- > here
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
< < returns null
if (in.readableBytes() < frameLength) {
return null;
} else {
// Is equal to or equal to cut fixed length
returnin.readRetainedSlice(frameLength); }}}Copy the code
2. Basic usage
ByteBuf buf = Unpooled.buffer(8);
// 0000 0000, 0000 0000, 0010 0111, 0001 0000
/ 0 0 39 16
buf.writeInt(10000);
//0000 0000, 0000 0000, 0000 0000, 0110 0100
// 00 0 100
buf.writeInt(100);
// We changed the length of server new FixedLengthFrameDecoder(6) to 6 bytes
// Then the client sends two buF objects to the server
Copy the code
Results:
For the first time:0 0 39 16 0 0The second:0 100 0 0 39 16
Copy the code
We find that the last two bytes of the first send are the first two bytes of the second send, and then we know for sure that the first send that was not received is in the buffer, and this buffer is that every client that connects to the server has a buffer corresponding to that client, Not a common buffer…
4. LengthFieldBasedFrameDecoder – custom decoder
1. Construction method
new LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast);
// byteOrder: big end \ small end
// maxFrameLength: Frame refers to the maximum buffer that a client receives each packet sent by the server.
LengthFieldOffset: specifies the offset of the length of a packet
LengthFieldLength: Specifies the length of the packet length, e.g. short is 2 bytes and int is 4 bytes
// lengthAdjustment : For example, if the input is (10, 0, 2,-2,0) and the input is [10(short),100(int),1000(int)], then we need to adjust the value, namely -2, to make the length -2, which is our actual length 8. The data can be read only when the value is (100,10000)
Initialbytesttestis the number of bytes that are skipped when testtestis returned
// failFast: Fails quickly
Copy the code
2. Easy to use
1. The first simple case (10, 0, 2,0,0)
1.pipeline().addLast()new LengthFieldBasedFrameDecoder(8.0.2.0.0));
2. The client sends data// Data sent by the client to the server
ByteBuf buf = Unpooled.buffer(10);
// The length is 8
buf.writeShort(8);
buf.writeInt(10000);
buf.writeInt(100);
3. Server-side code@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf messages = (ByteBuf) msg;
System.out.println("Data length =" + messages.readShort());
System.out.println("Data 1 =" + messages.readInt());
System.out.println("Data 2 =" + messages.readInt());
} else {
super.channelRead(ctx, msg); }}Copy the code
The output
Data length =8data1 = 10000data2 = 100
Copy the code
All OK,
2. The second case (10, 0, 2,-2,0)
ByteBuf buf = Unpooled.buffer(10);
// The length is 10, including the length field, so adjust to -2
buf.writeShort(10);
buf.writeInt(10000);
buf.writeInt(100); The server-side code is unchangedCopy the code
Output, the output data is consistent
Data length =10data1 = 10000data2 = 100
Copy the code
3. The third case (10, 0, 2,-2,2)
ByteBuf buf = Unpooled.buffer(10);
// The length is 10, including the length field, so adjust to -2
buf.writeShort(10);
buf.writeInt(10000);
buf.writeInt(100); The server-side code just drops the length@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf messages = (ByteBuf) msg;
System.out.println("Data 1 =" + messages.readInt());
System.out.println("Data 2 =" + messages.readInt());
} else {
super.channelRead(ctx, msg); }}Copy the code
Output, the result is consistent, very correct, will not play………
data1 = 10000data2 = 100
Copy the code
3. Source code analysis:
// Note in direct memory
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// Discard too long, false at first
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
// The readable length is less than the offset at the end of the length
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
// True offset of length = r pointer + length offset
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// The length of each frame is read according to the number of bytes in your frame
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
// The length is less than 0
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
For example, if my length contains bytes of length, I need to adjust the number of bytes to remove the length
// True packet length = frameLength + Adjusted length + offset at the end of the length
frameLength += lengthAdjustment + lengthFieldEndOffset;
// The length is less than
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
// If the packet length is greater than maxFrameLength, an exception will be thrown
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null; }...// Skip initialized InitialByteststrip
in.skipBytes(initialBytesToStrip);
/ / r pointer
int readerIndex = in.readerIndex();
// Real Length = Length - InitialByteststrip
int actualFrameLength = frameLengthInt - initialBytesToStrip;
// This is also a slice, accounting +1, which we'll talk about later
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
// Change the reader position
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
Copy the code
5. DelimiterBasedFrameDecoder – custom delimiter decoder
Custom delimiter decoder
1. Construction method
new DelimiterBasedFrameDecoder(50.false.true, Unpooled.copiedBuffer("a", CharsetUtil.UTF_8))
// maxFrameLength Specifies the maximum number of packets per packet
// stripDelimiter is enough to remove delimiters
// failFast If an exception occurs, it is quickly thrown
// delimiter delimiter
Copy the code
2. Easy to use
// 1. On the server side, add the decoder
// separator decoder
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(50.false.true, Unpooled.copiedBuffer("a", CharsetUtil.UTF_8)));
// String decoder
ch.pipeline().addLast(new StringDecoder());
// Own decoder
ch.pipeline().addLast(new MyDecoder());
/ / channelRead () method
if (msg instanceof String) {
System.out.println(msg);
}
// 2. Whether we registered on the client sent:
run.channel().writeAndFlush(Unpooled.copiedBuffer("Hello a, you're really good.", CharsetUtil.UTF_8));
run.channel().writeAndFlush(Unpooled.copiedBuffer("A", CharsetUtil.UTF_8));
Copy the code
Output result:
Server output: Hello a you are really good ACopy the code
We find that the delimiter is split at the end of the sentence. If there is no delimiter, the other part is put into the buffer, waiting for the next read.
6. LineBasedFrameDecoder – line-by-line decoder
1. Construction method
LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast)
// maxLength: maximum frame
// failFast: Fast throws exceptions
// stripDelimiter: Removes delimiters
Copy the code
2. Quick use
// 1. Newline decoder
ch.pipeline().addLast(new LineBasedFrameDecoder(1000.true.true));
// 2. String decoder
ch.pipeline().addLast(new StringDecoder());
// 3. Own processor
ch.pipeline().addLast(newMyHandler()); Server sends: ctx.writeAndFlush(Unpooled. CopiedBuffer ("HELLO, this is server \n", CharsetUtil.UTF_8)); The client sends: run.channel().writeAndFlush(unpooled.copiedBuffer ()"Hello, you're really good.", CharsetUtil.UTF_8))
run.channel().writeAndFlush(Unpooled.copiedBuffer("Ah, \ n", CharsetUtil.UTF_8))
Copy the code
Output result:
Server: HELLO, you are really good, client: HELLO, this is the serverCopy the code
3. Analyze the use of the -buf.foreachbyte () method
ByteBuf buf = Unpooled.buffer(10);
buf.writeCharSequence("hell\n", CharsetUtil.UTF_8);
byte x = '\n';
//ByteProcessor.FIND_LF
int i = buf.forEachByte(0.10.new ByteProcessor() {
@Override
public boolean process(byte value) throws Exception {
// do not want the loop to return false
if (value == x) {
return false;
}
// Return true if you want to continue the loop
return true; }}); System.out.println("index : "+i);
Copy the code
Output:
index : 4
Copy the code
So it’s very easy to find where the newline is and just cut it…….
7. IdleStateHandler – Heartbeat detection processor
Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
Raises an IdleStateEvent event when a channel has not performed read Write or both for a period of time, so it belongs to a stream of events
Recommended usage in the official source code, and in general,
The first parameter is: (READER_IDLE) No data was received for a while.
The first parameter is: (WRITER_IDLE) No data was sent for a while.
The first parameter is: (ALL_IDLE) No data was either received or sent for a while.
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("idleStateHandler".new IdleStateHandler(60.30.0));
channel.pipeline().addLast("myHandler".newMyHandler()); }}// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
} else if (e.state() == IdleState.WRITER_IDLE) {
}
}
}
}
Copy the code
1. How to design a reasonable heartbeat detection
I can't find the article either. I refer to an article by Ali Dubbo. His idea is to focus on the client side and the server side, because our idea is to let the server side send heartbeat packets and periodically detect them. This is not good for us is the logic complex, consider the situation too much, so change to customer subject, especially suitable for client development. Thinking about is the client and server, the client when 60 s didn't receive the message from the server side, will take the initiative to give the server sends a heartbeat packets, at this point, when the server did not receive and send to the client's length more than 120 s we disconnect with the client, so the server is very simple, the client is also very simpleCopy the code
1. Server-side code
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// Heartbeat detection, read/write timeout is 120S, no message is received or sent
pipeline.addLast("idleStateHandler".new IdleStateHandler(0.0.120));
// Heartbeat detection processor
pipeline.addLast("serverHeartBeatHandler".new ServerHeartBeatHandler(listener));
}
// Processor logic
public class ServerHeartBeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// Check whether it is IdleStateEvent
if (evt instanceof IdleStateEvent) {
// The heartbeat detection server is shut down due to timeout
// pass to handlerRemove
ctx.close();
} else {
// Otherwise, no processing is done
super.userEventTriggered(ctx, evt); }}}Copy the code
2. Client code
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// Heartbeat detection, if 60S we do not receive the server message, we will send a heartbeat packet
pipeline.addLast("nettyHeartBeatHandler".new IdleStateHandler(60.0 , 0)
pipeline.addLast("heartBeatHandler".new ClientHeartBeatHandler(listener));
}
/ / processor
public class ClientHeartBeatHandler extends ChannelDuplexHandler {
private ChatBootListener listener;
public ClientHeartBeatHandler(ChatBootListener listener) {
this.listener = listener;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// Send a heartbeat packet
ctx.channel().writeAndFlush(Constants.HEART_BEAT_NPACK).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
// Success is usually sent to the buffer, but does not mean that the server received the request
} else {
// TODO:2019/11/16 failure... See the demand}}}); }else {
// Pass to the parent class
super.userEventTriggered(ctx, evt); }}}Copy the code
2. Fundamentals
The EventExecutor of the ChannelHandlerContext executes a task periodically, passing in a runnable object and a delay time, and then executing………. periodically