preface
This chapter begins reading Netty core source code.
- Read the official case study for the use of Netty core components
- Netty core components: Channel, ChannelHandler, ChannelPipeline, ChannelHandlerContext, EventLoop, EventLoopGroup, and BootStrap. This section gives a brief overview and will be further analyzed in subsequent chapters
- How to configure Netty with Open source framework: Dubbo, ShardingProxy, RocketMQ
A case,
First take a look at the official EchoServer and EchoClient configurations.
1, the EchoServer
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port"."8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
// Boss thread group with 1 thread
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// worker thread group, the number of threads is core *2
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
// Configure the reactor thread model
b.group(bossGroup, workerGroup)
// Set the channel type of the ServerSocket
.channel(NioServerSocketChannel.class)
// Set the channel of the ServerSocket
.option(ChannelOption.SO_BACKLOG, 100)
// Configure ServerSocketChannel Handler to execute
.handler(new LoggingHandler(LogLevel.INFO))
// Configure the Handler that the SocketChannel client needs to execute
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if(sslCtx ! =null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(serverHandler); }});// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
// Close the thread groupbossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
The Server startup configuration consists of the following steps:
- Thread Model configuration: Usually the Server side uses the master/slave Reactor thread model.
- The Boss thread group is responsible for receiving the client channel with the accept event and registering the client channel with the corresponding Selector of the Worker thread.
- The Worker thread group is responsible for handling read and write events and may also be responsible for codec, depending on whether the codec is complex enough to block IO threads.
- Other businesses tend to run in a separate thread pool, which EchoServer does not.
- Channel type configuration: here is the server, using NioServerSocketChannel.
- Option parameter configuration: Includes option and childOption. Option is the server Socket configuration and childOption is the client Socket configuration.
- Handler configuration: Includes Handler and childHandler. Handler Configures the handler that a server Channel needs to execute. ChildHandler configures the handler that a client Channel needs to execute.
The Server is started by calling the Bind method of ServerBootstrap. Bind returns a ChannelFuture and waits for the Server to be started synchronously through sync. Finally, block with closeFuture.sync and wait for the ServerSocket to close.
2, EchoClient
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host"."127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port"."8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size"."256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
// Configure the event loop group
b.group(group)
// Set the channel type
.channel(NioSocketChannel.class)
/ / configure option
.option(ChannelOption.TCP_NODELAY, true)
// Configure the client channel Handler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if(sslCtx ! =null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(newEchoClientHandler()); }});// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.group.shutdownGracefully(); }}}Copy the code
The Client startup configuration is relatively simple, consisting of the following steps:
- Event loop group configuration: The client needs to configure only one event loop group to handle different events on the channel. The general business logic is handled in a separate thread pool, which is not represented by EchoClient.
- Channel type configuration: The client channel type is NioSocketChannel.
- Option parameter configuration: The client only needs to configure option.
- Handler configuration: Only one Handler needs to be configured on the client.
The client establishes a connection with the server through the Connect method of Bootstrap, returns a ChannelFuture, and waits for the connection to be established through sync. When the connection is established, block with closeFuture.sync and wait for the connection to close.
Second, the Channel
A Channel is a Channel that processes underlying communications and associates Netty components.
The parent interface AttributeMap holds the ability to obtain attributes through the attr method and hasAttr method to determine whether the attributes exist.
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> key);
<T> boolean hasAttr(AttributeKey<T> key);
}
Copy the code
Parent interface ChannelOutboundInvoker, Channel out of the stack executor, with the underlying communication capabilities. As you can see, Netty’s operations are asynchronous. A Future or Promise object is returned after a method call. When the actual operation is complete, the Future or Promise object is notified.
public interface ChannelOutboundInvoker {
// Bind ports
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
// Close the current resource
ChannelFuture close(a);
ChannelFuture close(ChannelPromise promise);
// Establish a connection/close a connection
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(a);
ChannelFuture disconnect(ChannelPromise promise);
// Log out of EventLoop
ChannelFuture deregister(a);
ChannelFuture deregister(ChannelPromise promise);
// Set the concerned I/O event to OP_READ
ChannelOutboundInvoker read(a);
/ / write
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush(a);
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
/ future/structure/promise
ChannelPromise newPromise(a);
ChannelProgressivePromise newProgressivePromise(a);
ChannelFuture newSucceededFuture(a);
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise(a);
}
Copy the code
The Channel interface itself provides the relationship between Channel status and Netty components.
public interface Channel extends AttributeMap.ChannelOutboundInvoker.Comparable<Channel> {
/** Channel status **/
// Whether to open it
boolean isOpen(a);
// Is the EventLoop registered
boolean isRegistered(a);
// Whether it is already active (establish connection)
boolean isActive(a);
// Return a Future that will be notified when the channel is closed
ChannelFuture closeFuture(a);
// Returns true when the IO thread can write immediately
boolean isWritable(a);
// If isWritable returns true, the number of writable bytes is returned; Otherwise return 0
long bytesBeforeUnwritable(a);
// If isWritable returns false, return the number of readable bytes; Otherwise return 0
long bytesBeforeWritable(a);
// Channel local address
SocketAddress localAddress(a);
// The channel connects to the remote address
SocketAddress remoteAddress(a);
/** Associate Netty components **/
// The unique identifier of a Channel
ChannelId id(a);
// The current Channel registered EventLoop
EventLoop eventLoop(a);
/ / parent Channel
// SocketChannel accepted by ServerSocketChannel returns ServerSocketChannel as its parent
Channel parent(a);
/ / configuration
ChannelConfig config(a);
/ / metadata
ChannelMetadata metadata(a);
Unsafe instance, the Unsafe instance is the real low-level class that handles communication.
// User code does not operate directly, hence Unsafe
Unsafe unsafe(a);
// ChannelPipeline
ChannelPipeline pipeline(a);
// ByteBuf allocator
ByteBufAllocator alloc(a);
}
Copy the code
Channel.Unsafe is an internal interface declared in the Channel interface. The Unsafe interface is used by Netty itself and is not called directly by user code.
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle(a);
SocketAddress localAddress(a);
SocketAddress remoteAddress(a);
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly(a);
void deregister(ChannelPromise promise);
void beginRead(a);
void write(Object msg, ChannelPromise promise);
void flush(a);
ChannelPromise voidPromise(a);
ChannelOutboundBuffer outboundBuffer(a);
}
Copy the code
ServerChannel is a flag interface that inherits from the Channel interface and creates a Channel with ServerChannel Accept, the former being the parent of the latter. An example of a ServerChannel implementation is NioServerSocketChannel.
/**
* A {@link Channel} that accepts an incoming connection attempt and creates
* its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is
* a good example.
*/
public interface ServerChannel extends Channel {
// This is a tag interface.
}
Copy the code
ChannelHandler and ChannelPipeline
1, ChannelHandler
The ChannelHandler interface provides a hook for ChannelHandler to be added to or removed from the ChannelHandlerContext.
public interface ChannelHandler {
// When ChannelHandler is added to the context
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// Triggered when ChannelHandler is removed from context
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}
Copy the code
ChannelHandler can be added and removed.
ChannelInboundHandler
ChannelInboundHandler handles the pushing, mainly when an event occurs on a Channel.
public interface ChannelInboundHandler extends ChannelHandler {
Raised when a Channel is registered to an EventLoop
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
Emitted when a Channel is unplugged from EventLoop
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
// Channel is triggered when activated (e.g. connection is established)
void channelActive(ChannelHandlerContext ctx) throws Exception;
// The Channel is triggered when it is not activated (e.g. the connection is closed)
void channelInactive(ChannelHandlerContext ctx) throws Exception;
// Triggered when a Channel reads MSG data
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
// channelRead is triggered when channelRead ends
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
// User-defined events are triggered
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
// The isWritable of a Channel changes
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
// Triggered when an exception occurs
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
Copy the code
As you can see, the ChannelInboundHandler defines several hook methods. The most important for business development is the channelRead method, which fetches data in a channel. But if you want to implement ChannelInboundHandler, you implement all of these methods, even if you don’t do anything.
In order to solve this problem, provided the ChannelInboundHandlerAdapter Netty, user code can inherit ChannelInboundHandlerAdapter, so you don’t need to implement all the hook method, the method only needs to realize their concern can. The @skip annotated method will not be called unless a subclass overrides the method, which will be explained later.
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
/ /... Omit other implementations
}
Copy the code
ChannelOutboundHandler
The ChannelOutboundHandler handles the process out of the stack. When a user reads or writes using a channel, the hook method is triggered. This method basically corresponds to the ChannelOutboundInvoker interface that a channel inherits.
public interface ChannelOutboundHandler extends ChannelHandler {
// When the bind operation is triggered
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
// When the connect operation fires
void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
// When Disconnect is triggered
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// When close is triggered
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// When the deregister operation is triggered
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// When the read operation fires
void read(ChannelHandlerContext ctx) throws Exception;
// When the write operation is triggered
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
// When flush is triggered
void flush(ChannelHandlerContext ctx) throws Exception;
}
Copy the code
ChannelOutboundHandler like ChannelInboundHandler, Netty provides ChannelOutboundHandlerAdapter adaptation, code is not posted, And the realization way ChannelInboundHandlerAdapter consistent.
In addition, Netty provides a ChannelDuplexHandler if you want to handle both out and in stacks.
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
/ /... Omit the other ChannelOutboundHandler method implementations
}
Copy the code
2, ChannelPipeline
There are several problems with ChannelHandler:
- A single ChannelHandler cannot handle the entire network request, and often a Single ChannelHandler will only handle part of a request at a time. For example, codec and business processing are often different handlers.
- How to orchestrate channelHandlers when there are multiple ChannelHandlers.
- The Channel interface inherits the ChannelOutboundInvoker interface, which can trigger the ChannelOutboundHandler out of the stack Handler execution.
ChannelPipeline is born to solve these problems. It not only inherits the ChannelOutboundInvoker interface to trigger the event out of the stack, but also inherits the ChannelInboundInvoker interface to trigger the event into the stack. These fireXXX correspond to the ChannelInboundHandler methods without explanation.
public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRegistered(a);
ChannelInboundInvoker fireChannelUnregistered(a);
ChannelInboundInvoker fireChannelActive(a);
ChannelInboundInvoker fireChannelInactive(a);
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete(a);
ChannelInboundInvoker fireChannelWritabilityChanged(a);
}
Copy the code
The ChannelPipeline also maintains multiple ChannelHandlers that propagate Inbound and Outbound events.
The ChannelPipeline interface provides the operation of adding, deleting, modifying, and checking lists. These list nodes appear to be channelHandlers, but are actually ChannelHandlerContext.
public interface ChannelPipeline
extends ChannelInboundInvoker.ChannelOutboundInvoker.可迭代<Entry<String.ChannelHandler>> {
/ / to add
ChannelPipeline addLast(ChannelHandler... handlers);
// Add thread group
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
/ / delete
ChannelPipeline remove(ChannelHandler handler);
/ / change
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
/ / check
ChannelHandler get(String name);
/ /... Omit other similar methods
}
Copy the code
Suppose we created the following pipe:
ChannelPipeline p = ... ; p.addLast("1".new InboundHandlerA());
p.addLast("2".new InboundHandlerB());
p.addLast("3".new OutboundHandlerA());
p.addLast("4".new OutboundHandlerB());
p.addLast("5".new InboundOutboundHandlerX());
Copy the code
In the example above, a class whose name begins with Inbound indicates that it is an Inbound handler. A class whose name begins with Outbound indicates that it is an Outbound handler, and InboundOutboundHandlerX handles both inbound and Outbound stations. When the event comes in, the handler is evaluated in order 1, 2, 3, 4, 5. Events exit the station in sequence 5, 4, 3, 2, 1. The ChannelPipeline skips the evaluation of certain handlers to shorten the stack depth:
- 3 and 4 do not implement ChannelInboundHandler, so the actual order of evaluation for inbound events is 1, 2, and 5.
- 1 and 2 do not implement ChannelOutboundHandler, so the actual order of evaluation for outbound events is: 5, 4, and 3.
3, ChannelHandlerContext
ChannelHandlerContext is a context object, hold the Channel/ChannelHandler ChannelPipeline important component. It also inherits ChannelInboundInvoker and ChannelOutboundInvoker, which can trigger loading and unloading events.
public interface ChannelHandlerContext extends AttributeMap.ChannelInboundInvoker.ChannelOutboundInvoker {
// Bind the Channel of this Context
Channel channel(a);
// The thread pool service used
EventExecutor executor(a);
// Unique name
String name(a);
// Bind ChannelHandler to this Context
ChannelHandler handler(a);
// Whether the bound ChannelHandler is removed
boolean isRemoved(a);
// Which Pipeline this Context belongs to
ChannelPipeline pipeline(a);
// ByteBuf allocator
ByteBufAllocator alloc(a);
}
Copy the code
ChannelHandlerContext chain table structure, which is embodied in the abstract class AbstractChannelHandlerContext.
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext.ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
}
Copy the code
AbstractChannelHandlerContext under fire stack event will choose a linked list node need to be performed. For example, fireChannelRegistered will query the list for a Handler implementation class that does not have the @skip annotation for channelRegistered.
@Override
public ChannelHandlerContext fireChannelRegistered(a) {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
/ / determine whether to skip the next AbstractChannelHandlerContext
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
}
// ...
}
private void invokeChannelRegistered(a) {
// ...
// Executes the channelRegistered method of the ChannelInboundHandler bound by the current context
((ChannelInboundHandler) handler()).channelRegistered(this);
}
Copy the code
EventLoop and EventLoopGroup
1, the EventLoop
EventLoop is translated as an EventLoop, and event-driven programming models (such as javascript) have the concept of EventLoop. Netty’s EventLoop can be thought of as a special implementation of the EventLoopGroup, which is a thread pool service.
public interface EventLoop extends OrderedEventExecutor.EventLoopGroup {
@Override
EventLoopGroup parent(a);
}
Copy the code
NioEventLoop is an event loop for JDK NIO. Each NioEventLoop holds a Selector that handles the SelectionKey event on the Selector.
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* The NIO {@link Selector}.
*/
private Selector selector;
}
Copy the code
In addition to handling events, NioEventLoop performs some tasks based on actual policies, simplifying the process as follows.
@Override
protected void run(a) {
for (;;) {
// Polls to see if an event has occurred
select();
// Handle events
processSelectedKeys();
// Execute the taskrunAllTasks(); }}Copy the code
NioEventLoop parent SingleThreadEventExecutor binding thread, and hold the task queue.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// Task queue
private final Queue<Runnable> taskQueue;
// Thread of EventLoop binding
private volatile Thread thread;
// JDK Executor single-threaded services
private final Executor executor;
}
Copy the code
The serialization design of Netty avoids the thread contention problem, and the core is the design of EventLoop. Each object that needs to be synchronized is associated with a corresponding EventLoop thread that handles the tasks generated by that object, which is why an EventLoop needs to execute tasks in addition to the EventLoop. So netty source code can often see the following code:
// Execute the task in the EventLoop associated with the Channel object
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
// ...}});Copy the code
Or code like this:
// Check whether the EventLoop associated with the CTX object is the current thread
if (ctx.executor().inEventLoop()) {
// If so, execute directly
closeOutbound0(promise);
} else {
// Otherwise put it into the EventLoop task queue of the corresponding thread
ctx.executor().execute(new Runnable() {
@Override
public void run(a) { closeOutbound0(promise); }}); }// io.netty.util.concurrent.AbstractEventExecutor#inEventLoop
@Override
public boolean inEventLoop(a) {
return inEventLoop(Thread.currentThread());
}
// io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
@Override
public boolean inEventLoop(Thread thread) {
// Check whether the EventLoop binding thread is the same as the input thread
return thread == this.thread;
}
// io.netty.util.concurrent.SingleThreadEventExecutor#execute
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// Add the task to the task queue
addTask(task);
// If the current thread is not an EventLoop bound thread, try to start the EventLoop corresponding thread
if(! inEventLoop) { startThread(); }/ / wake EventLoop
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
// NioEventLoop#wakeup
@Override
protected void wakeup(boolean inEventLoop) {
// If another thread wakes up and was not previously awakened, call the selector
if(! inEventLoop && nextWakeupNanos.getAndSet(AWAKE) ! = AWAKE) { selector.wakeup(); }}Copy the code
2, EventLoopGroup
The EventLoopGroup manages multiple Eventloops. It is responsible for selecting eventloops and registering channels to the corresponding Eventloops. The interface functions and method responsibilities are clear on Javadoc, and the purpose of both register methods is the same: to register Channe with EventLoop.
/**
* Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
* processed for later selection during the event loop.
*/
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
*/
EventLoop next(a);
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(ChannelPromise promise);
}
Copy the code
V. BootStrap and ServerBootStrap
1, AbstractBootstrap
BootStrap is translated as a BootStrap program to facilitate the client to configure and start Netty. AbstractBootStrap is a common parent of client BootStrap and ServerBootStrap. AbstractBootStrap provides several capabilities:
configuration
Handler, Option, ATTR, Group, channel, and other common configurations.
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B.C>, C extends Channel> implements Cloneable {
// EventLoopGroup means that each thread in the thread pool holds a Selector
volatile EventLoopGroup group;
// Build a factory for Channel instances. Create different Channel instances according to the Channel type
private volatile ChannelFactory<? extends C> channelFactory;
/ / options
private finalMap<ChannelOption<? >, Object> options =newLinkedHashMap<ChannelOption<? >, Object>();/ / property
private finalMap<AttributeKey<? >, Object> attrs =newConcurrentHashMap<AttributeKey<? >, Object>();/ / processor
private volatile ChannelHandler handler;
}
Copy the code
Create and register a Channel
Create a Channel based on the configured Channel type through the ChannelFactory. Initialize the Channel based on the configured handler/option/attr. Finally, the EventLoopGroup register method is used to register the Channel according to the configured group.
final ChannelFuture initAndRegister(a) {
1. Create a Channel
Channel channel = channelFactory.newChannel();
// 2. Initialize Channel
init(channel);
// 3. Register Channel
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
Copy the code
Create a Channel and bind the port
AbstractBootStrap is an abstract parent class of both client and server. Creating channels and binding ports is a capability that is usually only used by the server.
private ChannelFuture doBind(final SocketAddress localAddress) {
Create and register a Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! =null) {
return regFuture;
}
// 2. Bind ports
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// ...}}Copy the code
Methods that require subclass implementation
AbstractBootStrap requires subclasses to implement several methods.
Init: Subclasses need to provide an implementation for initializing a channel. Usually, subclasses set the channel’s option/attr through the parent’s helper methods and add handlers.
abstract void init(Channel channel) throws Exception;
Copy the code
Config: Returns the configuration information. AbstractBootstrapConfig is an abstract parent class for server and client configurations.
public abstract AbstractBootstrapConfig<B, C> config(a);
Copy the code
Clone: Subclasses need to support cloning.
public abstract B clone(a);
Copy the code
2, the BootStrap
BootStrap is a client boot class.
public class Bootstrap extends AbstractBootstrap<Bootstrap.Channel> {
/ / AbstractBootstrapConfig construction
private final BootstrapConfig config = new BootstrapConfig(this);
}
Copy the code
BootStrap config and Clone methods are easy to paste directly.
public Bootstrap clone(a) {
return new Bootstrap(this);
}
public final BootstrapConfig config(a) {
return config;
}
Copy the code
The emphasis is on the init method of BootStrap, which adds the configured handler to the ChannelPipeline and sets the configured attR and option to the Channel.
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
// Add the configured handler to ChannelPipeline
p.addLast(config.handler());
// The superclass method sets the option of a Channel
setChannelOptions(channel, newOptionsArray(), logger);
// The parent class method sets the attr of the Channel
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
Copy the code
In addition to the abstract method implementation of the parent class, BootStrap also provides a connect method to connect to the server. I won’t go into details here, but I’ll talk about the startup process later.
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else{ channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}); }Copy the code
3, ServerBootStrap
ServerBootStrap is the server boot class. Because the Server end is often based on master-slave Reactor model, so in addition to the Server end group/handler/option/attr configuration, ServerBootStrap can also configure childGroup/childHandler/childOption/childAttr. The primary Reactor handles connection events, and the secondary Reactor handles read and write events.
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {
// Client Channel configuration
private finalMap<ChannelOption<? >, Object> childOptions =newLinkedHashMap<ChannelOption<? >, Object>();private finalMap<AttributeKey<? >, Object> childAttrs =newConcurrentHashMap<AttributeKey<? >, Object>();private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
// Server configuration
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}
Copy the code
The config and Clone methods of ServerBootStrap are the same as those of BootStrap.
public final ServerBootstrapConfig config(a) {
return config;
}
public ServerBootstrap clone(a) {
return new ServerBootstrap(this);
}
Copy the code
Focus on the Init method of ServerBootStrap. Set the ServerChannel option/attr/handler. Note that the child-related configuration is passed into the ServerBootstrapAcceptor Handler that handles the ServerChannel, which will be used later.
@Override
void init(Channel channel) {
// Set ServerChannel's option and attr
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// Set the ServerChannel handler
ChannelPipeline p = channel.pipeline();
// Assemble the child-related attributes
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions;synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// Handler for ServerChannel
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
// In addition to the configured Handler, Netty added its own ServerBootstrapAcceptor to handle connection events
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
ServerBootstrapAcceptor is constructed with the child configuration passed in
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
Open source framework using Netty configuration cases
1, Dubbo
Dubbo uses Netty as the underlying communication framework by default. Server configuration entry NettyServer#doOpen.
protected void doOpen(a) throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1."NettyServerBoss");
// Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ...}}); }Copy the code
- BossGroup Number of bossGroup threads: 1.
- WorkerGroup threads: The value defined by the URL is preferred. Otherwise, the number of cores is + 1 and the maximum value is 32.
Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)
- option:
- SO_REUSEADDR: true, Socket parameter, address multiplexing, default value False. There are four situations to use:
- (1). When there is a socket1 in TIME_WAIT state with the same local address and port, and the program you want to start socket2 occupies the address and port, for example, restarting the service and keeping the previous port.
- (2). A machine with multiple network adapters or using IP Alias technology starts multiple processes on the same port, but the local IP address bound to each process cannot be the same.
- (3). A single process binds the same port to multiple sockets, but the IP address bound to each socket is different.
- (4). Repeated binding of identical addresses and ports. But this is only for UDP multicast, not TCP.
- TCP_NODELAY: true, a TCP parameter that sends data immediately. The default value is true (the default value is true for Netty and False for operating systems). This value enables the Nagle algorithm (false), which concatenates small fragments of data into larger packets to minimize the number of packets sent. If smaller packets need to be sent, the algorithm needs to be disabled. Netty disables this algorithm by default to minimize packet transmission delay.
- ALLOCATOR: PooledByteBufAllocator. DEFAULT. Specifies the allocator for ByteBuf.
- SO_REUSEADDR: true, Socket parameter, address multiplexing, default value False. There are four situations to use:
Client configuration entry NettyClient#doOpen.
// Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
private static final EventLoopGroup NIO_EVENT_LOOP_GROUP = eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");
protected void doOpen(a) throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ...}}); }Copy the code
-
Number of I/O threads: Number of cores + 1, Max. 32, math.min (Runtime.getruntime ().availableProcessors() + 1, 32).
-
Option:
- SO_KEEPALIVE: true, Socket parameter, connection keepalive, default value is False. When this function is enabled, TCP proactively detects the validity of idle connections. This function can be regarded as the heartbeat mechanism of TCP. Note that the default heartbeat interval is 7200s, that is, 2 hours. Netty disables this function by default. This is why Dubbo is called the default single long connection.
- TCP_NODELAY: true, TCP parameter, send data immediately, disable Nagle algorithm.
- ALLOCATOR: PooledByteBufAllocator. DEFAULT.
- CONNECT_TIMEOUT_MILLIS: indicates the connection timeout duration. The value is the configured timeout event. The minimum value is 3000 milliseconds.
2, ShardingProxy
ShardingProxy server uses Netty. The entry is ShardingProxy#start.
public void start(final int port) {
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup Number of threads 1
bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
// workerGroup Number of threads Cores x 2
groupsNio(bootstrap);
// ...
}
private void groupsNio(final ServerBootstrap bootstrap) {
// Use the default number of threads = number of cores *2 for NettyEventLoopGroup
workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024.16 * 1024 * 1024))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerHandlerInitializer());
}
Copy the code
- BossGroup: the number of threads is 1.
- WorkerGroup: specifies the default number of threads to use NettyEventLoopGroup. The number of cores is 2.
- option:
- SO_BACKLOG: 128. The Socket parameter indicates the length of the queue in which the server receives connections. If the queue is full, the client connection will be rejected. The default value is 200 for Windows and 128 for others.
- WRITE_BUFFER_WATER_MARK: 8 m – 16 m. Netty write buffer water line configuration, default 32K-64K. WriteBufferWaterMark Sets the low watermark and high watermark markers for the write buffer. If the number of bytes queued in the write buffer exceeds the high watermark, channel.iswritable () will start returning false. If the number of bytes queued in the write buffer exceeds the high watermark and then falls below the low watermark, channel.iswritable () will start returning true again.
- ALLOCATOR: PooledByteBufAllocator. DEFAULT.
- childOption:
- ALLOCATOR: PooledByteBufAllocator. DEFAULT.
- TCP_NODELAY: true, disable Nagle algorithm.
3, RocketMQ
RocketMQ’s NameServer and Broker both use Netty as the underlying communication framework.
Server side configuration entry for NameServer and Broker NettyRemotingServer#start.
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
this.eventLoopGroupBoss = new NioEventLoopGroup(1.new ThreadFactory() {
/ /...
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
// ...
});
}
public void start(a) {
// Business thread pool configuration =8
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
// ...
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0.0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); }});if(nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); }}Copy the code
- BossGroup: the number of threads is 1.
- WorkerGroup: Specifies the configured number of threads. The default value is 3.
- Business thread pool: The number of threads is set to 8 by default. Post implementation of ChannelInitializer out here, because here is obvious, often the boss and the worker only do IO operation, time-consuming business operations in another thread pool dealing with (defaultEventExecutorGroup). The same is true for other frameworks, except that the thread pool is not exposed to the Netty configuration. Instead, a business Handler encapsulates business processes as tasks into a custom business thread pool.
- option:
- SO_BACKLOG: 1024. The length of the queue on which the server receives connections.
- SO_REUSEADDR: true. Address multiplexing.
- SO_KEEPALIVE: false. The connection is alive.
- childOption:
- TCP_NODELAY: true. Turn off the Nagle algorithm.
- SO_SNDBUF: the value is configured. Socket parameter, the size of the TCP data send buffer, which is the TCP send sliding window.
- SO_RCVBUF: the value is configured. Socket parameter, the size of the TCP data receiving buffer, which is the TCP receiving sliding window.
conclusion
- How to use Netty: Configure ServerBootstrap on the server and BootStrap on the client. In the initPipeline method of ChannelInitializer, add the ChannelHandler to do the business logic.
- Netty Core Components:
- Channel: a Channel that processes underlying communications and associates Netty components.
- ChannelHandler: the ChannelInboundHandler handles the loading and is triggered when an event occurs on a Channel. ChannelOutboundHandler handles out of the stack and is triggered when a user reads or writes using a Channel.
- A ChannelPipeline. Inheritance ChannelOutboundInvoker/ChannelInboundInvoker interface, is responsible for the trigger in the stack events; ChannelHandler choreography that propagates stack events in and out of the channel through the ChannelHandler contex before and after Pointers.
- ChannelHandlerContext: Context object. Hold the Channel/ChannelHandler ChannelPipeline important component. Inheritance ChannelInboundInvoker/ChannelOutboundInvoker at the same time, can trigger into and out of the stack. In addition, ChannelHandlerContext has back and forth Pointers to back and forth ChannelHandlerContext, which is controlled by ChannelPipeline for event propagation.
- EventLoop: indicates an EventLoop. For NioEventLoop, there is a thread for each event loop that holds a Selector. The event cycle is divided into three steps: polling events, processing events, and executing tasks. Netty serial design avoids thread competition. The core of the design is EventLoop, which associates each object that needs to be synchronized with a corresponding EventLoop thread to handle the tasks generated by the object.
- EventLoopGroup: indicates an EventLoopGroup. The EventLoopGroup manages multiple Eventloops. After accessing a Channel, it selects an EventLoop and registers the Channel with the corresponding EventLoop (which is also registered with the JDK Selector).
- BootStrap: BootStrap program. This helps clients configure and start Netty.