Recently, I have nothing to do on weekends. I am going to learn Netty and write some blogs to record it.
This article mainly presents from three methods: Netty core components introduction, Netty server creation, Netty start process source code analysis
If you know something about Netty, it should be a pleasant read
Netty core components
ByteBuf
The buffer ByteBuf is an enhancement of ByteBuffer in the JDK NIO library
Buffer is directly connected to both ends of the channel (data sent through the channel needs to be converted to a ByteBuf object, which is also directly obtained from the channel)
The Channel and the Unsafe
Channel aggregates a set of network I/O operations – read, write, client initiating a connection, closing a connection, and link closing
The UnSafe interface AIDS channels to implement I/O operations (should not be called directly by user code)
ChannelPipeline and ChannelHandler
ChannelHandler: Handles I/O events. Each ChannelHanlder implements its own processing logic for THE I/O events that need attention. For example, the decoding Handler only decodes events.
ChannelPipeline: A ChannelPipeline consists of several ChannelHandlers arranged in a certain order. I/O events flow through the pipeline (inbound events from start to finish, outbound events from end to end), and each handler handles the events.
NioEventLoop and NioEventLoopGroup
NioEventLoop: An event loop (Reactor thread) that listens for the ready status of multiple channels and generates incoming events when the channels are ready
NioEventLoopGroup: A Reactor thread pool. When a new channel is created, the NioEventLoopGroup assigns it an event loop through which all subsequent I/O operations are performed.
The Future and Promise
These two classes are Netty’s support for asynchrony. Promise is used to set the results of asynchronous operations (write) and Future is used to get the results of asynchronous operations (read).
The Netty server is created
Let’s start by building a simple server application
The following is a server program that retrieves the current date and time: returns the current date “2020-12-11” when the client enters the action “today”, and the current time” 03:11:11″ when the client enters the action “time”.
public static void main(String[] args) {
//1. Thread pool configuration
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
NioEventLoopGroup childGroup = new NioEventLoopGroup(4);
bootstrap.group(parentGroup, childGroup);
//2. Configure the server Channel
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//3. Configure subchannels
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
/ / decoder
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
/ / business handler
channel.pipeline().addLast(new BusinessHandler());
/ / encoder
channel.pipeline().addLast(newStringEncoder(StandardCharsets.UTF_8)); }});try {
ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();
future.channel().closeFuture().sync();
} catch (Exception e) {
//todo
}finally{ parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); }}static class BusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = (String) msg;
String ret = "";
if ("today".equals(s)) {
ret = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
} else if ("time".equals(s)) {
ret = new SimpleDateFormat("hh:mm:ss").format(new Date());
}
ret += "\r\n"; ctx.channel().writeAndFlush(ret); }}Copy the code
The whole application building process is very simple, summed up in four steps
1.Reactor thread pool configuration
2. Configure the server Channel
3. Subchannel configuration (subchannel created through server Channel)
4. Bind local ports and start services
Reactor thread pool configuration
We created two Reactor thread pools parentGroup and childGroup
ParentGroup is used by the server channel to accept new client connections (Accept)
ChildGroup is used to process network I/O requests for all server channel creation sub-channels
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
NioEventLoopGroup childGroup = new NioEventLoopGroup(4);
bootstrap.group(parentGroup, childGroup);
Copy the code
Server Channel configuration
Channel configuration on the server involves Channel type, ChanelOption, and AttributeKey (handler is not required).
-
Channel Type Configuration
The Channel type is NioServerSocketChannel — the underlying JDK NIO ServerSocketChannel.
bootstrap.channel(NioServerSocketChannel.class);
Copy the code
- Set ChannelOption and AttributeKey
ChildOption: TCP options, such as receive buffer size (SO_RCVBUF), send buffer size (SO_SNDBUF), kernel TCP connection queue size (SO_BACKLOG), etc
AttributeKey: An object attached to a Channel that can share data between multiple ChannelHandlers
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.attr(AttributeKey.newInstance("TEST"), new Object());
Copy the code
Note: ChannelHandler is used to process I/O events and is required by channels. Since Netty provides a handler (ServerBootstrapAcceptor) that initializes client connections, we do not need to set the server Channel
The Channel configuration
Channel configuration involves ChanelOption, AttributeKey, and ChannelHandler
-
Set ChannelOption and AttributeKey
ChannelOption and AttributeKey can be configured for each Channel, just like server Channel configuration.
-
ChannelHandler configuration
For server channels, the Netty framework provides a Handler for accepting connections, which we can do without setting; But for each sub-channel created by a server Channel we need to configure a Handler to handle I/O events.
First: the decoder is a must. The flow in our business logic is usually objects, and bytes are converted into Java objects by configuring the decoder (the decoder also needs to handle TCP unpacking and sticky packets)
Then: custom business handlers are used to handle specific business logic, as shown above in BusinessHandler.
Finally: When you need to return the result, you need to configure an encoder to encode the output object as a ByteBuf object that can be used for channel transport
For this example:
LineBasedFrameDecoder and StringDecoder are decoders: decode a line of data into a String in Java
BusinessHandler is a business processor: handles concrete business logic (gets the current date or time)
StringEncoder is an encoder: a String object is encoded as a ByteBuf object for channel transport.
Bind local ports and start services
After the configuration is complete, bind local ports to start the service
ChannelFuture future = bootstrap.bind(7001).syncUninterruptibly();
Copy the code
Here through Netty to create a server application is complete, let’s look at Netty from the source side of the start process
Netty server start process source code analysis
Source code based on 4.1 branches: do some simplification, only retain the core logic
Start with the bind method
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
return this.doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//1. Initialize NioServerSocketChannel and register with EventLoopGroup
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//2.1. Failed to register
if(regFuture.cause() ! =null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
If the registration is successful, bind the local port directly
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//3. If the registration is unknown (because it is an asynchronous operation), add a listener to the regFuture object for callback processing after the registration is complete
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) {
promise.setFailure(cause);
} else{ promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code
The whole bind method is relatively simple, the core logic is in doBind, doBind logic has three main steps
-
InitAndRegister: instantiates ServerSocketChannel(in this case NioServerSocketChannel) and registers with the EventLoopGroup
-
If the first step fails, return directly. If the registration is successful, call the doBind method to bind the local port to start the server
-
If the result of the registration is not yet known (reg is an asynchronous operation), add a ChannelFutureListener to the regFuture object for callback processing after the registration is complete
The second and third steps are relatively simple, and we mainly need to look at the first step, initAndRegister
InitAndRegister (initializes NioServerSocketChannel and registers with EventLoopGroup)
InitAndRegister is a template method that can be analyzed in three steps
- Instantiate, in this case, through the reflection-based factory method
- Initialization (implemented by subclasses)
- Registered to EventLoopGroup
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
//1. Instantiate the factory method based on reflection
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//
}
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
//
}
return regFuture;
}
Copy the code
Step 1 and step 3 we’re not going to do expansion here, but step 2, what does init do
Init (initialize channel)
Here is the source code for the init method in ServerBootstrap
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions;synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
It basically populates the NioServerSocketChannel instance with the parameters we configured with the ServerBootstarp bootstrap class, no problem.
A ServerBootstrapAcceptor handler is added to the socketChannel pipeline. The ServerBootstrapAcceptor handler is used to initialize subchannels accepted by the server.
conclusion
Through the analysis of several methods of bind, doBind, initAndRegister, init, we can have a general understanding of the whole startup process of Netty
1. Instantiate and initialize NioServerSocketChannel
2. Register initialized nioServerSocketChannel with EventLoopGroup (parentEventLoopGroup)
3. After the registration is successful, invoke bind local port to complete the entire startup process
Of course, you need to have some understanding of Pipeline, handler, eventLoop, etc to understand how Netty works
Write in the last
TO ME: First blog of 2021, come on! Own a word a word code out of the feeling is very good!!
TO YOU: If YOU find it helpful, please like it or recommend it.