Resolution of Vertx.netServer creation

The theoretical knowledge

Read this article before the knowledge:

How does Netty start a server

2. Netty threading model

Rely on

<dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>4.0.0</version>
</dependency>
Copy the code

The source code,

Vertx several tool classes/methods explained

vertx.getOrCreateContext
public ContextInternal getOrCreateContext(a) {
    AbstractContext ctx = getContext();
    if (ctx == null) {
      // We are running embedded - Create a context
      ctx = createEventLoopContext();
      stickyContext.set(new WeakReference<>(ctx));
    }
    return ctx;
  }
 public AbstractContext getContext(a) {
    AbstractContext context = (AbstractContext) ContextInternal.current();
    if(context ! =null && context.owner() == this) {
      return context;
    } else {
      WeakReference<AbstractContext> ref = stickyContext.get();
      returnref ! =null ? ref.get() : null; }}/ / this is derived from the IO vertx. Core. Impl. ContextInternal class
static ContextInternal current(a) {
    Thread current = Thread.currentThread();
    if (current instanceof VertxThread) {
      return ((VertxThread) current).context();
    }
    return null;
  }

Copy the code

The core logic is to get the Context bound to the current thread. This Context also holds the current thread information, similar to ThreadLocal, and the Context can get the EventLoop bound. This is one of the core designs of Vertx — non-worker threads bind to an EventLoop

Vertx layer entry

vertx.createNetServer();

The final method is implemented as

public NetServer createNetServer(NetServerOptions options) {
    return new NetServerImpl(this, options);
  }

Copy the code

The core method of its class is Listen, which listens on a port

Track the actual call methods, found a method signature for io.vertx.core.net.impl.NetServerImpl#listen (SocketAddress) method contains the actual listen to invoke

 @Override
  public synchronized Future<NetServer> listen(SocketAddress localAddress) {
  // omit part
    ContextInternal listenContext = vertx.getOrCreateContext();
    registeredHandler = handler;
// This is where we switch from the Vertx layer to the Netty layer, and this is where we need to watch carefully
    io.netty.util.concurrent.Future<Channel> bindFuture = listen(localAddress, listenContext, new NetServerWorker(listenContext, handler, exceptionHandler));

   // omit unnecessary parts
  }
Copy the code

Netty layer

The following code from io.vertx.core.net.impl.TCPServerBase#listen (SocketAddress ContextInternal, Handler < Channel >) method

Pre-knowledge and conclusion

How would a pure Netty server be constructed

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(ParentEventLoopGroup, ChildEvenLoopGroup);
Copy the code

The core is to get two EventLoopGroups, one to handle accept events and one to handle the rest

Vertx itself is based on Netty EventLoop, that is, we can reuse the thread as long as we get Vertx internal EventLoop

Then there is a common sense, can not repeatedly listen to the same port, in fact, Vertx to its processing is to repeatedly listen to the same port to take polling strategy, that is, Netty default strategy, its implementation is also very simple. Let’s start with the concept of the main Server, which is the first TCPServerBase class to listen on, and the rest of the listening threads simply join its ChildEventLoopGroup

Field information and initialization

Let’s look at its internal fields first

 // Per server
  private EventLoop eventLoop;
  private Handler<Channel> worker;
  private volatile boolean listening;
  private ContextInternal listenContext;
  private ServerID id;
  private TCPServerBase actualServer;

  // Main
  private ServerChannelLoadBalancer channelBalancer;
  private io.netty.util.concurrent.Future<Channel> bindFuture;
  private Set<TCPServerBase> servers;
  privateTCPMetrics<? > metrics;private volatile int actualPort;
Copy the code

That is, each contains a reference to Main and its own context

 // Initialize information
 this.listenContext = context;
    this.listening = true;
    this.eventLoop = context.nettyEventLoop();
    this.worker = worker;

    Map<ServerID, TCPServerBase> sharedNetServers = vertx.sharedTCPServers((Class<TCPServerBase>) getClass());
    synchronized (sharedNetServers) {
      actualPort = localAddress.port();
      String hostOrPath = localAddress.isInetSocket() ? localAddress.host() : localAddress.path();
        // Check whether the main server exists by listening on the port
      TCPServerBase main;
      boolean shared;
      if(actualPort ! =0) {
        id = new ServerID(actualPort, hostOrPath);
        main = sharedNetServers.get(id);
        shared = true;
      } else {
        if(creatingContext ! =null&& creatingContext.deploymentID() ! =null) {
          id = new ServerID(actualPort, hostOrPath + "/" + creatingContext.deploymentID());
          main = sharedNetServers.get(id);
          shared = true;
        } else {
          id = new ServerID(actualPort, hostOrPath);
          main = null;
          shared = false; }}Copy the code

These don’t need to be looked at too closely, just some initialization and checking that the Main Server exists

There is no main server
if (main == null) {
    // Create a set that logically listens on the same port serve
        servers = new HashSet<>();
        servers.add(this);
    // We'll talk about this later, but I'll leave you with a conclusion. This is used to store the EventLoopGroup, which records connection information
    // can be regarded as a ChildEventLoopGroup
        channelBalancer = new ServerChannelLoadBalancer(vertx.getAcceptorEventLoopGroup().next());
        channelBalancer.addWorker(eventLoop, worker);
// Start a server with netty
        ServerBootstrap bootstrap = new ServerBootstrap();
    // Get the existing accept thread and the channelBalancer ChildEventLoopGroup
        bootstrap.group(vertx.getAcceptorEventLoopGroup(), channelBalancer.workers());

    // Add the sharedNetServers inside vertx so that the next server that listens on the same port gets the main that actually listens on the same port
        bootstrap.childHandler(channelBalancer);
        applyConnectionOptions(localAddress.isDomainSocket(), bootstrap);

        try {
          sslHelper.validate(vertx);
          bindFuture = AsyncResolveConnectHelper.doBind(vertx, localAddress, bootstrap);
          bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) res -> {
            if (res.isSuccess()) {
              Channel ch = res.getNow();
              log.trace("Net server listening on " + hostOrPath + ":" + ch.localAddress());
              // Update port to actual port when it is not a domain socket as wildcard port 0 might have been used
              if(actualPort ! = -1) {
                actualPort = ((InetSocketAddress)ch.localAddress()).getPort();
              }
              id = new ServerID(TCPServerBase.this.actualPort, id.host);
                // Add close callback
              listenContext.addCloseHook(this);
              metrics = createMetrics(localAddress);
            } else {
              if (shared) {
                synchronized (sharedNetServers) {
                  sharedNetServers.remove(id);
                }
              }
              listening  = false; }}); }catch (Throwable t) {
          listening = false;
          return vertx.getAcceptorEventLoopGroup().next().newFailedFuture(t);
        }
        if (shared) {
          sharedNetServers.put(id, this);
        }
        actualServer = this;
Copy the code
The main server exists
else {
        // Assign metadata directly
    // Add your eventLoop to the ChildeventLoopGroup
        actualServer = main;
        actualServer.servers.add(this);
        actualServer.channelBalancer.addWorker(eventLoop, worker);
        metrics = main.metrics;
        listenContext.addCloseHook(this);
      }
Copy the code
What is a ServerChannelLoadBalancer?

Core internal field

 private final VertxEventLoopGroup workers;
  private final ConcurrentMap<EventLoop, WorkerList> workerMap = new ConcurrentHashMap<>();
  private final ChannelGroup channelGroup;
//VerxEventLoopGoup class declaration
//class VertxEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup
Copy the code

Let’s first look at how joining the ChildEventLoopGroup works

public synchronized void addWorker(EventLoop eventLoop, Handler<Channel> handler) {
    workers.addWorker(eventLoop);
  / / to omit
  }

Copy the code

It’s that simple

Notice that it also implements the ChannelInitializer interface. What does it do to implement that interface

@Override
  protected void initChannel(Channel ch) {
    Handler<Channel> handler = chooseInitializer(ch.eventLoop());
    if (handler == null) {
      ch.close();
    } else{ channelGroup.add(ch); handler.handle(ch); }}Copy the code

It simply saves the current access Channel for disconnection when it is closed

What happens when we close the closeHook on the server or context binding?

TCPServerBase implements the Closeable interface. I just need to look at the implementation to see what is happening

public synchronized void close(Promise<Void> completion) {
    if(! listening) { completion.complete();return;
    }
    listening = false;
    listenContext.removeCloseHook(this);
    Map<ServerID, TCPServerBase> servers = vertx.sharedTCPServers((Class<TCPServerBase>) getClass());
    synchronized (servers) {
      ServerChannelLoadBalancer balancer = actualServer.channelBalancer;
        // Remove the current thread's EventLoop from ChildEventLoop
      balancer.removeWorker(eventLoop, worker);
      if (balancer.hasHandlers()) {
        // The actual server still has handlers so we don't actually close it
        completion.complete();
      } else {
        // No worker left so close the actual server
        // The done handler needs to be executed on the context that calls close, NOT the context
        // of the actual server
        servers.remove(id);
          // Close the actual listening serveractualServer.actualClose(completion); }}}private void actualClose(Promise<Void> done) {
    // Close all channels connected to this port
    channelBalancer.close();
   // Close serverChannel, which is equivalent to close ServerBootStrap and release resources
    bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) fut -> {
      if (fut.isSuccess()) {
        Channel channel = fut.getNow();
        ChannelFuture a = channel.close();
        if(metrics ! =null) {
          a.addListener(cg -> metrics.close());
        }
        a.addListener((PromiseInternal<Void>)done);
      } else{ done.complete(); }}); }Copy the code

Write your own

A few small was catnip

Vertx. Vertx () gets VertxImp, which implements the VertxInternal interface and can be used directly, after all, it is also used in the source code

public class DeprecatedDeviceServerImp implements DeviceServer.Closeable {

    private int port;
    private boolean listening;
    private VertxInternal vertxInternal;
    //all server
    private static HashMap<Integer, DeprecatedDeviceServerImp> servers;


    //main
    private ChannelInit channelInit;
    private DeprecatedDeviceServerImp actualServer;
    private io.netty.util.concurrent.Future<Channel> bindFuture;
    private boolean isClose;

    public DeprecatedDeviceServerImp(VertxInternal vertxInternal) {
        this.vertxInternal = vertxInternal;
    }

    public Future<DeviceServer> listen(int port) {
        ContextInternal context = vertxInternal.getContext();
        this.port = port;
        this.listening = true;
        DeprecatedDeviceServerImp main = servers.get(port);
        Promise<DeviceServer> promise = Promise.<DeviceServer>promise();
        synchronized (servers) {
            if (main == null) {
                servers.put(port, this);
                actualServer = main = this;
                actualServer.isClose = false;
                this.channelInit = new ChannelInit(new VertxEventLoopGroup());
                this.channelInit.addWorker(context.nettyEventLoop());
                ServerBootstrap serverBootstrap = new ServerBootstrap();

                serverBootstrap.channel(NioServerSocketChannel.class)
                        .group(vertxInternal.getAcceptorEventLoopGroup(), channelInit.getVertxEventLoopGroup())
                        .childHandler(channelInit);
                bindFuture = AsyncResolveConnectHelper.doBind(vertxInternal,new SocketAddressImpl(new InetSocketAddress(port)), serverBootstrap);
                bindFuture.addListener(future -> {
                            if (future.isSuccess()) {
                                promise.complete(this);
                                context.addCloseHook(this);
                            } else {
                                listening = false; promise.fail(future.cause()); }}); }}if(main ! =this) {
            main.channelInit.addWorker(context.nettyEventLoop());
            context.addCloseHook(this);
            actualServer = main;
        }

        returnpromise.future(); }}Copy the code