Resolution of Vertx.netServer creation
The theoretical knowledge
Read this article before the knowledge:
How does Netty start a server
2. Netty threading model
Rely on
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.0.0</version>
</dependency>
Copy the code
The source code,
Vertx several tool classes/methods explained
vertx.getOrCreateContext
public ContextInternal getOrCreateContext(a) {
AbstractContext ctx = getContext();
if (ctx == null) {
// We are running embedded - Create a context
ctx = createEventLoopContext();
stickyContext.set(new WeakReference<>(ctx));
}
return ctx;
}
public AbstractContext getContext(a) {
AbstractContext context = (AbstractContext) ContextInternal.current();
if(context ! =null && context.owner() == this) {
return context;
} else {
WeakReference<AbstractContext> ref = stickyContext.get();
returnref ! =null ? ref.get() : null; }}/ / this is derived from the IO vertx. Core. Impl. ContextInternal class
static ContextInternal current(a) {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
return ((VertxThread) current).context();
}
return null;
}
Copy the code
The core logic is to get the Context bound to the current thread. This Context also holds the current thread information, similar to ThreadLocal, and the Context can get the EventLoop bound. This is one of the core designs of Vertx — non-worker threads bind to an EventLoop
Vertx layer entry
vertx.createNetServer();
The final method is implemented as
public NetServer createNetServer(NetServerOptions options) {
return new NetServerImpl(this, options);
}
Copy the code
The core method of its class is Listen, which listens on a port
Track the actual call methods, found a method signature for io.vertx.core.net.impl.NetServerImpl#listen (SocketAddress) method contains the actual listen to invoke
@Override
public synchronized Future<NetServer> listen(SocketAddress localAddress) {
// omit part
ContextInternal listenContext = vertx.getOrCreateContext();
registeredHandler = handler;
// This is where we switch from the Vertx layer to the Netty layer, and this is where we need to watch carefully
io.netty.util.concurrent.Future<Channel> bindFuture = listen(localAddress, listenContext, new NetServerWorker(listenContext, handler, exceptionHandler));
// omit unnecessary parts
}
Copy the code
Netty layer
The following code from io.vertx.core.net.impl.TCPServerBase#listen (SocketAddress ContextInternal, Handler < Channel >) method
Pre-knowledge and conclusion
How would a pure Netty server be constructed
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(ParentEventLoopGroup, ChildEvenLoopGroup);
Copy the code
The core is to get two EventLoopGroups, one to handle accept events and one to handle the rest
Vertx itself is based on Netty EventLoop, that is, we can reuse the thread as long as we get Vertx internal EventLoop
Then there is a common sense, can not repeatedly listen to the same port, in fact, Vertx to its processing is to repeatedly listen to the same port to take polling strategy, that is, Netty default strategy, its implementation is also very simple. Let’s start with the concept of the main Server, which is the first TCPServerBase class to listen on, and the rest of the listening threads simply join its ChildEventLoopGroup
Field information and initialization
Let’s look at its internal fields first
// Per server
private EventLoop eventLoop;
private Handler<Channel> worker;
private volatile boolean listening;
private ContextInternal listenContext;
private ServerID id;
private TCPServerBase actualServer;
// Main
private ServerChannelLoadBalancer channelBalancer;
private io.netty.util.concurrent.Future<Channel> bindFuture;
private Set<TCPServerBase> servers;
privateTCPMetrics<? > metrics;private volatile int actualPort;
Copy the code
That is, each contains a reference to Main and its own context
// Initialize information
this.listenContext = context;
this.listening = true;
this.eventLoop = context.nettyEventLoop();
this.worker = worker;
Map<ServerID, TCPServerBase> sharedNetServers = vertx.sharedTCPServers((Class<TCPServerBase>) getClass());
synchronized (sharedNetServers) {
actualPort = localAddress.port();
String hostOrPath = localAddress.isInetSocket() ? localAddress.host() : localAddress.path();
// Check whether the main server exists by listening on the port
TCPServerBase main;
boolean shared;
if(actualPort ! =0) {
id = new ServerID(actualPort, hostOrPath);
main = sharedNetServers.get(id);
shared = true;
} else {
if(creatingContext ! =null&& creatingContext.deploymentID() ! =null) {
id = new ServerID(actualPort, hostOrPath + "/" + creatingContext.deploymentID());
main = sharedNetServers.get(id);
shared = true;
} else {
id = new ServerID(actualPort, hostOrPath);
main = null;
shared = false; }}Copy the code
These don’t need to be looked at too closely, just some initialization and checking that the Main Server exists
There is no main server
if (main == null) {
// Create a set that logically listens on the same port serve
servers = new HashSet<>();
servers.add(this);
// We'll talk about this later, but I'll leave you with a conclusion. This is used to store the EventLoopGroup, which records connection information
// can be regarded as a ChildEventLoopGroup
channelBalancer = new ServerChannelLoadBalancer(vertx.getAcceptorEventLoopGroup().next());
channelBalancer.addWorker(eventLoop, worker);
// Start a server with netty
ServerBootstrap bootstrap = new ServerBootstrap();
// Get the existing accept thread and the channelBalancer ChildEventLoopGroup
bootstrap.group(vertx.getAcceptorEventLoopGroup(), channelBalancer.workers());
// Add the sharedNetServers inside vertx so that the next server that listens on the same port gets the main that actually listens on the same port
bootstrap.childHandler(channelBalancer);
applyConnectionOptions(localAddress.isDomainSocket(), bootstrap);
try {
sslHelper.validate(vertx);
bindFuture = AsyncResolveConnectHelper.doBind(vertx, localAddress, bootstrap);
bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) res -> {
if (res.isSuccess()) {
Channel ch = res.getNow();
log.trace("Net server listening on " + hostOrPath + ":" + ch.localAddress());
// Update port to actual port when it is not a domain socket as wildcard port 0 might have been used
if(actualPort ! = -1) {
actualPort = ((InetSocketAddress)ch.localAddress()).getPort();
}
id = new ServerID(TCPServerBase.this.actualPort, id.host);
// Add close callback
listenContext.addCloseHook(this);
metrics = createMetrics(localAddress);
} else {
if (shared) {
synchronized (sharedNetServers) {
sharedNetServers.remove(id);
}
}
listening = false; }}); }catch (Throwable t) {
listening = false;
return vertx.getAcceptorEventLoopGroup().next().newFailedFuture(t);
}
if (shared) {
sharedNetServers.put(id, this);
}
actualServer = this;
Copy the code
The main server exists
else {
// Assign metadata directly
// Add your eventLoop to the ChildeventLoopGroup
actualServer = main;
actualServer.servers.add(this);
actualServer.channelBalancer.addWorker(eventLoop, worker);
metrics = main.metrics;
listenContext.addCloseHook(this);
}
Copy the code
What is a ServerChannelLoadBalancer?
Core internal field
private final VertxEventLoopGroup workers;
private final ConcurrentMap<EventLoop, WorkerList> workerMap = new ConcurrentHashMap<>();
private final ChannelGroup channelGroup;
//VerxEventLoopGoup class declaration
//class VertxEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup
Copy the code
Let’s first look at how joining the ChildEventLoopGroup works
public synchronized void addWorker(EventLoop eventLoop, Handler<Channel> handler) {
workers.addWorker(eventLoop);
/ / to omit
}
Copy the code
It’s that simple
Notice that it also implements the ChannelInitializer interface. What does it do to implement that interface
@Override
protected void initChannel(Channel ch) {
Handler<Channel> handler = chooseInitializer(ch.eventLoop());
if (handler == null) {
ch.close();
} else{ channelGroup.add(ch); handler.handle(ch); }}Copy the code
It simply saves the current access Channel for disconnection when it is closed
What happens when we close the closeHook on the server or context binding?
TCPServerBase implements the Closeable interface. I just need to look at the implementation to see what is happening
public synchronized void close(Promise<Void> completion) {
if(! listening) { completion.complete();return;
}
listening = false;
listenContext.removeCloseHook(this);
Map<ServerID, TCPServerBase> servers = vertx.sharedTCPServers((Class<TCPServerBase>) getClass());
synchronized (servers) {
ServerChannelLoadBalancer balancer = actualServer.channelBalancer;
// Remove the current thread's EventLoop from ChildEventLoop
balancer.removeWorker(eventLoop, worker);
if (balancer.hasHandlers()) {
// The actual server still has handlers so we don't actually close it
completion.complete();
} else {
// No worker left so close the actual server
// The done handler needs to be executed on the context that calls close, NOT the context
// of the actual server
servers.remove(id);
// Close the actual listening serveractualServer.actualClose(completion); }}}private void actualClose(Promise<Void> done) {
// Close all channels connected to this port
channelBalancer.close();
// Close serverChannel, which is equivalent to close ServerBootStrap and release resources
bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) fut -> {
if (fut.isSuccess()) {
Channel channel = fut.getNow();
ChannelFuture a = channel.close();
if(metrics ! =null) {
a.addListener(cg -> metrics.close());
}
a.addListener((PromiseInternal<Void>)done);
} else{ done.complete(); }}); }Copy the code
Write your own
A few small was catnip
Vertx. Vertx () gets VertxImp, which implements the VertxInternal interface and can be used directly, after all, it is also used in the source code
public class DeprecatedDeviceServerImp implements DeviceServer.Closeable {
private int port;
private boolean listening;
private VertxInternal vertxInternal;
//all server
private static HashMap<Integer, DeprecatedDeviceServerImp> servers;
//main
private ChannelInit channelInit;
private DeprecatedDeviceServerImp actualServer;
private io.netty.util.concurrent.Future<Channel> bindFuture;
private boolean isClose;
public DeprecatedDeviceServerImp(VertxInternal vertxInternal) {
this.vertxInternal = vertxInternal;
}
public Future<DeviceServer> listen(int port) {
ContextInternal context = vertxInternal.getContext();
this.port = port;
this.listening = true;
DeprecatedDeviceServerImp main = servers.get(port);
Promise<DeviceServer> promise = Promise.<DeviceServer>promise();
synchronized (servers) {
if (main == null) {
servers.put(port, this);
actualServer = main = this;
actualServer.isClose = false;
this.channelInit = new ChannelInit(new VertxEventLoopGroup());
this.channelInit.addWorker(context.nettyEventLoop());
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(vertxInternal.getAcceptorEventLoopGroup(), channelInit.getVertxEventLoopGroup())
.childHandler(channelInit);
bindFuture = AsyncResolveConnectHelper.doBind(vertxInternal,new SocketAddressImpl(new InetSocketAddress(port)), serverBootstrap);
bindFuture.addListener(future -> {
if (future.isSuccess()) {
promise.complete(this);
context.addCloseHook(this);
} else {
listening = false; promise.fail(future.cause()); }}); }}if(main ! =this) {
main.channelInit.addWorker(context.nettyEventLoop());
context.addCloseHook(this);
actualServer = main;
}
returnpromise.future(); }}Copy the code