preface
This series is the Webflux part of my Java programming methodology Responsive Interpretation series, which is now shared. The pre-knowledge Rxjava2, Reactor’s interpretation has been recorded and posted on site B, and the address is as follows:
Rxjava source code reading and sharing: www.bilibili.com/video/av345…
Reactor source code reading and sharing: www.bilibili.com/video/av353…
NIO source reading related video sharing: www.bilibili.com/video/av432…
NIO source code interpretation video related supporting articles:
BIO to NIO source code some things BIO
BIO to NIO source code for something on NIO
BIO to NIO source code for something in NIO
BIO to NIO source code for something under NIO Selector
BIO to NIO source code for some things to read on NIO Buffer
BIO to NIO source code for some things under NIO Buffer interpretation
Java Programming Methodology -Spring WebFlux 01 Why Spring WebFlux
Java Programming Methodology -Spring WebFlux 01
Among them, Rxjava and Reactor will not be open to the public as the content of my book. If you are interested, you can spend some time to watch the video. I have made a thorough and detailed interpretation of the two libraries, including the design concept and related methodology.
HttpServer encapsulation
This book focuses on Netty servers, so readers should have basic knowledge and application skills about Netty. Next, we will explore the details of reactor-Netty from design to implementation, so that you can really learn good packaging design ideas. The latest version of this book is reactor-Netty 0.7.8.Release, but now there is a 0.8 version, and there are some minor changes in the source details between 0.7 and 0.8, please note. I’ll take a fresh look at version 0.8.
The introduction of the HttpServer
We learned from the previous chapter that Tomcat uses the Connector to receive and respond to connection requests. For Netty, if we want to use it as a Web server, let’s first look at a common use of Netty (here’s an example from DiscardServer Demo):
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/** * discard any incoming data */
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run(a) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); / / (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); / / (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) / / (3)
.childHandler(new ChannelInitializer<SocketChannel>() { / / (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) / / (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); / / (6)
// Bind the port to start receiving incoming connections
ChannelFuture f = b.bind(port).sync(); / / (7)
// Wait for the server socket to close.
// In this case, this doesn't happen, but you can gracefully shut down your server.
f.channel().closeFuture().sync();
} finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
newDiscardServer(port).run(); }}Copy the code
NioEventLoopGroup
Is used to deal withI/O
Multithreaded event looper for operation,Netty
Many different ones are offeredEventLoopGroup
To handle different transports. In this example we are implementing a server-side application, so there will be twoNioEventLoopGroup
Will be used. The first one is often calledBossGroup
To receive incoming connections. The second one is often calledWorkerGroup
, used to process connections that have been received onceBossGroup
When a connection is received, the connection information is registered toWorkerGroup
On. How do I know how many threads have been used and how do I map them to the ones that have been createdChannel
Depends onEventLoopGroup
, and can configure their relationships through constructors.ServerBootstrap
It’s a primingNIO
Helper startup class for the service. You can use it directly in this serviceChannel
But it can be a complicated process, and in many cases you don’t need to do it.- Here we use by specifying
NioServerSocketChannel
To illustrate a new oneChannel
How to receive incoming connections. - The event handler class here is often used to process a recently received event
Channel
.ChannelInitializer
Is a special processing class that helps users configure a new oneChannel
. Use its correspondingChannelPipeline
To join your service logic processing (here it isDiscardServerHandler
). As your program gets more complex, you may want to add more processing classespipline
, and then extract these anonymous classes to the topmost class (anonymous classes i.eChannelInitializer
Example we can think of it as a proxy pattern design, something likeReactor
In theSubscriber
Design implementation, layer after layer of packaging, and finally get one that we need one that can be processed layer after layerSubscriber
). - You can set it as specified here
Channel
Configuration parameters implemented. If we write oneTCP/IP
On the server, we can setsocket
“, such astcpNoDelay
和keepAlive
. Please refer to theChannelOption
andChannelConfig
Implement the interface document to pairChannelOption
I have a general idea. - And then let’s look at
option()
和childOption()
:option()
Is to provide theNioServerSocketChannel
Used to receive incoming connections.childOption()
Is supplied by the parent pipeServerChannel
Received connections, in this case as wellNioServerSocketChannel
. - All that is left is to bind the ports and start the service. Here we bind it on the server
8080
Port. Of course now you can call it multiple timesbind()
Method (based on different binding addresses).
Encapsulation of options for Bootstrap
Looking at the common Netty create a server usage, let’s look at Reactor Netty provides us with a wrapper of Http server: reactor.ipc.net. Ty. The Http server. The HttpServer. As we can see from DiscardServer Demo above, we first define a server to configure it by setting some conditions. Then we call its run method to start it. For better configurability, we use the builder mode, so that we can customize or directly use the default configuration (some of which are required to be configured). Otherwise, an exception will be thrown, which is one of the things we set up here):
//reactor.ipc.netty.http.server.HttpServer.Builder
public static final class Builder {
private String bindAddress = null;
private int port = 8080;
private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port);
private Consumer<? super HttpServerOptions.Builder> options;
private Builder(a) {}...public final Builder port(int port) {
this.port = port;
return this;
}
/**
* The options for the server, including bind address and port.
*
* @param options the options for the server, including bind address and port.
* @return {@code this}
*/
public final Builder options(Consumer<? super HttpServerOptions.Builder> options) {
this.options = Objects.requireNonNull(options, "options");
return this;
}
public HttpServer build(a) {
return new HttpServer(this); }}Copy the code
As you can see, here the HttpServer. Builder# options is a functional action Consumer, the incoming parameters is HttpServerOptions. Builder, In HttpServerOptions. Builder can be against us in DiscardServer Demo of the bootstrap. Conduct a series of the default configuration option or to configure, Our customization for option is mainly for ServerBootstrap#childOption. Because the reactor.ipc.net ty. Options. ServerOptions. Builder# option this method, For its superclass reactor.ipc.netty.options.Net tyOptions. Builder# option for the corresponding rewrite:
//reactor.ipc.netty.options.ServerOptions.Builder
public static class Builder<BUILDER extends Builder<BUILDER>>
extends NettyOptions.Builder<ServerBootstrap.ServerOptions.BUILDER>{... }//reactor.ipc.netty.options.ServerOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param <T> the option type
* @return {@code this}
* @see ServerBootstrap#childOption(ChannelOption, Object)
*/
@Override
public final <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.childOption(key, value);
return get();
}
//reactor.ipc.netty.options.NettyOptions.Builder#option
/**
* Set a {@link ChannelOption} value for low level connection settings like
* SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote
* peer.
*
* @param key the option key
* @param value the option value
* @param <T> the option type
* @return {@code this}
* @see Bootstrap#option(ChannelOption, Object)
*/
public <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.option(key, value);
return get();
}
Copy the code
This is something we need to pay attention to. Then, we go back to reactor.ipc.net ty. HTTP. Server. HttpServer. Builder, from its build this method shows that it returns a HttpServer instance, Bootstrap. group by checking the options of the httpServer. Builder instance passed in, and then checking the bootstrap.group, because to use the constructor configuration, first get ServerBootstrap. So need to determine whether there is available EventLoopGroup, this is we can decide for themselves, setting time, here bossGroup and workerGroup may call this one, it should pay attention to the (loopResources source code comments already speak very clear) :
//reactor.ipc.netty.http.server.HttpServer.Builder#build
public HttpServer build(a) {
return new HttpServer(this);
}
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
if (Objects.isNull(builder.options)) {
if (Objects.isNull(builder.bindAddress)) {
serverOptionsBuilder.listenAddress(builder.listenAddress.get());
}
else{ serverOptionsBuilder.host(builder.bindAddress).port(builder.port); }}else {
builder.options.accept(serverOptionsBuilder);
}
if(! serverOptionsBuilder.isLoopAvailable()) { serverOptionsBuilder.loopResources(HttpResources.get()); }this.options = serverOptionsBuilder.build();
this.server = new TcpBridgeServer(this.options);
}
//reactor.ipc.netty.options.NettyOptions.Builder
public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>,
SO extends NettyOptions<BOOTSTRAP.SO>, BUILDER extends Builder<BOOTSTRAP.SO.BUILDER>>
implements Supplier<BUILDER> {.../**
* Provide a shared {@link EventLoopGroup} each Connector handler.
*
* @param eventLoopGroup an eventLoopGroup to share
* @return {@code this}
*/
public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) {
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
return loopResources(preferNative -> eventLoopGroup);
}
/**
* Provide an {@link EventLoopGroup} supplier.
* Note that server might call it twice for both their selection and io loops.
*
* @param channelResources a selector accepting native runtime expectation and
* returning an eventLoopGroup
* @return {@code this}
*/
public final BUILDER loopResources(LoopResources channelResources) {
this.loopResources = Objects.requireNonNull(channelResources, "loopResources");
return get();
}
public final boolean isLoopAvailable(a) {
return this.loopResources ! =null; }... }Copy the code
As you can see, this class is the Supplier implementation, which is an object extractor that belongs to a functional action object suitable for lazy loading scenarios. LoopResources is also a FunctionalInterface (@functionalinterface), which is designed to serve the factory method of io.net ty.channel.channel:
//reactor.ipc.netty.resources.LoopResources
@FunctionalInterface
public interface LoopResources extends Disposable {
/**
* Default worker thread count, fallback to available processor
*/
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
"reactor.ipc.netty.workerCount"."" + Math.max(Runtime.getRuntime()
.availableProcessors(), 4)));
/**
* Default selector thread count, fallback to -1 (no selector thread)
*/
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
"reactor.ipc.netty.selectCount"."" + -1));
/**
* Create a simple {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*
* @param prefix the event loop thread name prefix
*
* @return a new {@link LoopResources} to provide automatically for {@link
* EventLoopGroup} and {@link Channel} factories
*/
static LoopResources create(String prefix) {
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
DEFAULT_IO_WORKER_COUNT,
true);
}
static LoopResources create(String prefix,
int selectCount,
int workerCount,
boolean daemon) {...return newDefaultLoopResources(prefix, selectCount, workerCount, daemon); }.../**
* Callback for server {@link EventLoopGroup} creation.
*
* @param useNative should use native group if current {@link #preferNative()} is also
* true
*
* @return a new {@link EventLoopGroup}
*/
EventLoopGroup onServer(boolean useNative); . }Copy the code
The static create method of this class can be used to quickly create a LoopResources instance while customizing. In addition, by virtue of the functional nature of LoopResources, lazy loading (hiding the business we want to implement in a method) is possible, that is, the required object instance is generated only when it is used. Namely in the use of reactor.ipc.netty.options.Net tyOptions. Builder# loopResources (loopResources channelResources) method, LoopResources (true -> new NioEventLoopGroup()); This can greatly save memory resources and improve performance.
summary
At this point, we’re going to go from netty’s generic use to HttpServer’s encapsulation and show it to you in this chapter. The purpose is to show you how this thing came about and for what purpose. Next, We’ll take a step-by-step look at the reactor-Netty concept and how it works with Spring WebFlux.