The reason

It was also due to my carelessness. When I wrote Java programming methodology — Reactor and Webflux, I thought Tomcat had a good blog about connector, so I briefly referred to it without in-depth analysis of the source code. As a result, WHEN I recorded and shared the video, Found that their article content and source code is not consistent, and through the search engine to search some Chinese blog articles, is not satisfactory, simply, their own through the latest source code to comb part of the content of Tomcat connector, but also to give yourself a warning, everything must be careful and careful again! Reference source code address: github.com/apache/tomc…

About THE Java programming methodology — Reactor and Webflux video sharing, has completed Rxjava and Reactor, site B address is as follows:

Rxjava source code reading and sharing: www.bilibili.com/video/av345…

Reactor source code reading and sharing: www.bilibili.com/video/av353…

Detailed description of Tomcat startup process

Start and end Basic Tomcat operations

On Linux, you can start and stop Tomcat using commands.

Go to the bin directory of Tomcat:

cd /java/tomcat/bin
Copy the code

Start Tomcat command:

./startup.sh
Copy the code

Command for stopping the Tomcat service:

./shutdown.sh
Copy the code

After implementation of tomcat. / shutdown. Sh, although the tomcat services cannot be normal visit, but the ps – ef | grep tomcat, found the tomcat corresponding Java process was not destroyed, according to the web container closed and then there is zombie Java process. The JVM will not exit if all threads in the JVM are daemons. The JVM does not exit if there is one or more non-daemons remaining). Run the following command to check whether the Tomcat process is complete:

ps -ef|grep tomcat
Copy the code

If there are user threads, kill them with kill -9 PID

Detailed explanation of Bootstrap startup process

Elif [“$1” = “start”]; elif [“$1” = “start”]; Place, we go down, can find that it calls the org. Apache. Catalina. Startup. The Bootstrap. Java class – this class under the start () method:

/**
* org.apache.catalina.startup.Bootstrap
* Start the Catalina daemon.
* @throws Exception Fatal start error
*/
public void start(a)
    throws Exception {
    if( catalinaDaemon==null ) init();

    Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null);
    method.invoke(catalinaDaemon, (Object [])null);

}
Copy the code

Here, the first time you start the server, will call its init (), it is mainly used to create org. Apache. Catalina. Startup. Catalina. Java class instances:

/**
* org.apache.catalina.startup.Bootstrap
* Initialize daemon.
* @throws Exception Fatal initialization error
*/
public void init(a) throws Exception {

    initClassLoaders();

    Thread.currentThread().setContextClassLoader(catalinaLoader);

    SecurityClassLoad.securityClassLoad(catalinaLoader);

    // Load our startup class and call its process() method
    if (log.isDebugEnabled())
        log.debug("Loading startup class"); Class<? > startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
    Object startupInstance = startupClass.getConstructor().newInstance();

    // Set the shared extensions class loader
    if (log.isDebugEnabled())
        log.debug("Setting startup class properties");
    String methodName = "setParentClassLoader"; Class<? > paramTypes[] =new Class[1];
    paramTypes[0] = Class.forName("java.lang.ClassLoader");
    Object paramValues[] = new Object[1];
    paramValues[0] = sharedLoader;
    Method method =
        startupInstance.getClass().getMethod(methodName, paramTypes);
    method.invoke(startupInstance, paramValues);

    catalinaDaemon = startupInstance;

}

Copy the code

The startup process Catalina in detail

Start in Catalina

Next, Bootstrap’s start() method calls the Catalina instance’s start method:

/**
* org.apache.catalina.startup.Catalina
* Start a new server instance.
*/
public void start(a) {

    if (getServer() == null) {
        load();
    }

    if (getServer() == null) {
        log.fatal(sm.getString("catalina.noServer"));
        return;
    }

    long t1 = System.nanoTime();

    // Start the new server
    try {
        getServer().start();
    } catch (LifecycleException e) {
        log.fatal(sm.getString("catalina.serverStartFail"), e);
        try {
            getServer().destroy();
        } catch (LifecycleException e1) {
            log.debug("destroy() failed for failed Server ", e1);
        }
        return;
    }

    long t2 = System.nanoTime();
    if(log.isInfoEnabled()) {
        log.info(sm.getString("catalina.startup", Long.valueOf((t2 - t1) / 1000000)));
    }

    // Register shutdown hook
    if (useShutdownHook) {
        if (shutdownHook == null) {
            shutdownHook = new CatalinaShutdownHook();
        }
        Runtime.getRuntime().addShutdownHook(shutdownHook);

        // If JULI is being used, disable JULI's shutdown hook since
        // shutdown hooks run in parallel and log messages may be lost
        // if JULI's hook completes before the CatalinaShutdownHook()
        LogManager logManager = LogManager.getLogManager();
        if (logManager instanceof ClassLoaderLogManager) {
            ((ClassLoaderLogManager) logManager).setUseShutdownHook(
                    false); }}if(await) { await(); stop(); }}Copy the code

Load (), getServer().start(), getServer().start(), getServer(), getServer().start(), getServer().

Load () in Catalina

Load () will create and configure the Digester we will use to start with createStartDigester(), then get the ServerXml file we configured, configure the properties in turn, and finally call getServer().init():

/**
* org.apache.catalina.startup.Catalina
* Start a new server instance.
*/
public void load(a) {

    if (loaded) {
        return;
    }
    loaded = true;

    long t1 = System.nanoTime();

    initDirs();

    // Before digester - it may be needed
    initNaming();

    // Set configuration source
    ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(Bootstrap.getCatalinaBaseFile(), getConfigFile()));
    File file = configFile();

    // Create and execute our Digester
    Digester digester = createStartDigester();

    try (ConfigurationSource.Resource resource = ConfigFileLoader.getSource().getServerXml()) {
        InputStream inputStream = resource.getInputStream();
        InputSource inputSource = new InputSource(resource.getURI().toURL().toString());
        inputSource.setByteStream(inputStream);
        digester.push(this);
        digester.parse(inputSource);
    } catch (Exception e) {
        if  (file == null) {
            log.warn(sm.getString("catalina.configFail", getConfigFile() + "] or [server-embed.xml"), e);
        } else {
            log.warn(sm.getString("catalina.configFail", file.getAbsolutePath()), e);
            if(file.exists() && ! file.canRead()) { log.warn(sm.getString("catalina.incorrectPermissions")); }}return;
    }

    getServer().setCatalina(this);
    getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
    getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());

    // Stream redirection
    initStreams();

    // Start the new server
    try {
        getServer().init();
    } catch (LifecycleException e) {
        if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
            throw new java.lang.Error(e);
        } else {
            log.error(sm.getString("catalina.initError"), e); }}long t2 = System.nanoTime();
    if(log.isInfoEnabled()) {
        log.info(sm.getString("catalina.init", Long.valueOf((t2 - t1) / 1000000))); }}Copy the code

Here, where the server comes from, we from the digester. AddObjectCreate (” server “and” org. Apache. Catalina. Core. StandardServer “, “className”); As you can see, it uses an instance of this class. Let’s go back to Digester.push (this); digester.parse(inputSource); Digester.push(this) is called before parsing. The top element of the stack is Catalina, which is used to set up the server for Catalina.

Parsing to

creates an instance of the StandardServer class and reflects the setter method that calls Digester’s top stack object (the method called is determined by the name value passed in). Digester involving IntrospectionUtils. SetProperty (top, name, value) method, namely the top for the stack object, the name for the stack object to set the property name, the value to set the attribute values. We started with Catalina at the top of the stack, so we called catalina.setServer (Server object) to set the Server up for a later call to server.start (). The StandardServer object instance is then placed in Digester’s Stack of Stack objects.

getServer().init()

Next, let’s look at getServer (). The init (), by the knowledge, we find org. Apache. Catalina. Core. StandardServer. Java class – this class, its inheritance LifecycleMBeanBase and implements the Server, LifecycleMBeanBase class: LifecycleMBeanBase class: LifecycleMBeanBase

//org.apache.catalina.util.LifecycleBase.java

@Override
public final synchronized void init(a) throws LifecycleException {
    if(! state.equals(LifecycleState.NEW)) { invalidTransition(Lifecycle.BEFORE_INIT_EVENT); }try {
        setStateInternal(LifecycleState.INITIALIZING, null.false);
        initInternal();
        setStateInternal(LifecycleState.INITIALIZED, null.false);
    } catch (Throwable t) {
        handleSubClassException(t, "lifecycleBase.initFail", toString()); }}Copy the code

The implementation of initInternal() in StandardServer is too much code. 1, call the superclass org. Apache. Catalina. Util. LifecycleMBeanBase# initInternal method, an registration

2. Register mBeans for other attributes of the class

3, NamingResources initialization: globalNamingResources init ();

4, start from common ClassLoader to view up to SystemClassLoader, traverse each ClassLoader corresponding view path, find the jar end file, read the Manifest information, Join the ExtensionValidator# containerManifestResources attribute.

5, initialize service, default implementation is StandardService.

I) call the super.initinternal () method

Ii) initialize the container, where the container instance is StandardEngine. Iii) Executor initialization iv)Connector initialization: A) org. Apache. Catalina. Connector. The connector connector HTTP / 1.1-8080 – b) org. Apache. Catalina. Connector. The connector The Connector AJP / 1.3 8009

GetServer ().start() in Catalina

Here, we can see the parent class of StandardServer org. Apache. Catalina. Util. LifecycleBase. Java implementation:

@Override
public final synchronized void start(a) throws LifecycleException {

    if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) ||
            LifecycleState.STARTED.equals(state)) {

        if (log.isDebugEnabled()) {
            Exception e = new LifecycleException();
            log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e);
        } else if (log.isInfoEnabled()) {
            log.info(sm.getString("lifecycleBase.alreadyStarted", toString()));
        }

        return;
    }

    if (state.equals(LifecycleState.NEW)) {
        init();
    } else if (state.equals(LifecycleState.FAILED)) {
        stop();
    } else if(! state.equals(LifecycleState.INITIALIZED) && ! state.equals(LifecycleState.STOPPED)) { invalidTransition(Lifecycle.BEFORE_START_EVENT); }try {
        setStateInternal(LifecycleState.STARTING_PREP, null.false);
        startInternal();
        if (state.equals(LifecycleState.FAILED)) {
            // This is a 'controlled' failure. The component put itself into the
            // FAILED state so call stop() to complete the clean-up.
            stop();
        } else if(! state.equals(LifecycleState.STARTING)) {// Shouldn't be necessary but acts as a check that sub-classes are
            // doing what they are supposed to.
            invalidTransition(Lifecycle.AFTER_START_EVENT);
        } else {
            setStateInternal(LifecycleState.STARTED, null.false); }}catch (Throwable t) {
        // This is an 'uncontrolled' failure so put the component into the
        // FAILED state and throw an exception.
        handleSubClassException(t, "lifecycleBase.startFail", toString()); }}Copy the code

For StandardServer, we focus on startInternal(); 1, trigger the CONFIGURE_START_EVENT event.

2. Set the state of this object to STARTING

3, start NameingResource: globalNamingResources. Start (); 4. StandardService starts.

I) Set the state to STARTING

Ii) Container startup, i.e. StandardEngine startup III) Executor startup iv) Connector startup: A) org. Apache. Catalina. Connector. The connector connector HTTP / 1.1-8080 – b) org. Apache. Catalina. Connector. The connector The Connector AJP / 1.3 8009

Finally, we have explored my protagonist Connector.

The Connector interpretation

The Connector constructor

The default configuration protocol for port 8080 is HTTP/1.1, as shown in Connector server. XML in apache-tomcat-9.0.14\conf.

<Connector port="8080" protocol="HTTP / 1.1"
            connectionTimeout="20000"
            redirectPort="8443" />
            <! -- Define an AJP 1.3 Connector on port 8009 -->
<Connector port="8009" protocol="AJP / 1.3" redirectPort="8443" />

Copy the code

With that in mind, let’s look at the implementation in its code:

public Connector(a) {
    this("org.apache.coyote.http11.Http11NioProtocol");
}


public Connector(String protocol) {
    boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
            AprLifecycleListener.getUseAprConnector();

    if ("HTTP / 1.1".equals(protocol) || protocol == null) {
        if (aprConnector) {
            protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
        } else {
            protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol"; }}else if ("AJP / 1.3".equals(protocol)) {
        if (aprConnector) {
            protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
        } else {
            protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol"; }}else {
        protocolHandlerClassName = protocol;
    }

    // Instantiate protocol handler
    ProtocolHandler p = null;
    try{ Class<? > clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); }catch (Exception e) {
        log.error(sm.getString(
                "coyoteConnector.protocolHandlerInstantiationFailed"), e);
    } finally {
        this.protocolHandler = p;
    }

    // Default for Connector depends on this system property
    setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}
Copy the code

For tomcat8.5 and above, the default is Http11NioProtocol. Here, we set it to HTTP/1.1, but according to the above statement, it is equal to Http11NioProtocol.

Connector initialization and startup

Also, we know from the previous section that Connector initialization is involved, which also extends LifecycleMBeanBase, so let’s look at its related initInternal() implementation:

@Override
protected void initInternal(a) throws LifecycleException {

    super.initInternal();

    if (protocolHandler == null) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
    }

    // Initialize adapter
    adapter = new CoyoteAdapter(this);
    protocolHandler.setAdapter(adapter);
    if(service ! =null) {
        protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
    }

    // Make sure parseBodyMethodsSet has a default
    if (null == parseBodyMethodsSet) {
        setParseBodyMethods(getParseBodyMethods());
    }

    if(protocolHandler.isAprRequired() && ! AprLifecycleListener.isAprAvailable()) {throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                getProtocolHandlerClassName()));
    }
    if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
            protocolHandler instanceofAbstractHttp11JsseProtocol) { AbstractHttp11JsseProtocol<? > jsseProtocolHandler = (AbstractHttp11JsseProtocol<? >) protocolHandler;if (jsseProtocolHandler.isSSLEnabled() &&
                jsseProtocolHandler.getSslImplementationName() == null) {
            // OpenSSL is compatible with the JSSE configuration, so use it if APR is availablejsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName()); }}try {
        protocolHandler.init();
    } catch (Exception e) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e); }}Copy the code

The process involved is as follows: 1. Register the MBean

Instantiate the CoyoteAdapter, which is the entry point to the request. When there is a request, the CoyoteAdapter handles the state, the request is reclaimed at the end, and the intermediate process is handled by pipeline.

3, protocolHandler initialization (org. Apache. Coyote. Http11. Http11Protocol)

In this step, the endpoint is initialized

Not to mention STARTING, which sets the state of this object to STARTING and calls protocolHandler.start(); Next, it’s time to get into our core rhythm.

@Override
protected void startInternal(a) throws LifecycleException {

    // Validate settings before starting
    if (getPortWithOffset() < 0) {
        throw new LifecycleException(sm.getString(
                "coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
    }

    setState(LifecycleState.STARTING);

    try {
        protocolHandler.start();
    } catch (Exception e) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerStartFailed"), e); }}Copy the code

Protocol interpretation

Here, we directly from the abstract implementation org. Apache. Coyote. AbstractProtocol. The Java perspective, it is to follow the life cycle, so it will inherit LifecycleMBeanBase and realizes own init () and the start () method, such as life cycle The specific logic is executed internally by the corresponding self-implementing endpoint:

//org.apache.coyote.AbstractProtocol.java
@Override
public void init(a) throws Exception {
    if (getLog().isInfoEnabled()) {
        getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
        logPortOffset();
    }

    if (oname == null) {
        // Component not pre-registered so register it
        oname = createObjectName();
        if(oname ! =null) {
            Registry.getRegistry(null.null).registerComponent(this, oname, null); }}if (this.domain ! =null) {
        rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
        Registry.getRegistry(null.null).registerComponent(
                getHandler().getGlobal(), rgOname, null);
    }

    String endpointName = getName();
    endpoint.setName(endpointName.substring(1, endpointName.length()-1));
    endpoint.setDomain(domain);

    endpoint.init();
}


@Override
public void start(a) throws Exception {
    if (getLog().isInfoEnabled()) {
        getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
        logPortOffset();
    }

    endpoint.start();
    monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
            new Runnable() {
                @Override
                public void run(a) {
                    if(! isPaused()) { startAsyncTimeout(); }}},0.60, TimeUnit.SECONDS);
}
Copy the code

Take org. Apache. Coyote. Http11. Http11NioProtocol this class, it receives is NioEndpoint for the realization of the constructor, the concrete implementation method of its internal had thus NioEndpoint to implement the logic:

public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {

    private static final Log log = LogFactory.getLog(Http11NioProtocol.class);


    public Http11NioProtocol(a) {
        super(new NioEndpoint());
    }


    @Override
    protected Log getLog(a) { return log; }


    // -------------------- Pool setup --------------------

    public void setPollerThreadCount(int count) {
        ((NioEndpoint)getEndpoint()).setPollerThreadCount(count);
    }

    public int getPollerThreadCount(a) {
        return ((NioEndpoint)getEndpoint()).getPollerThreadCount();
    }

    public void setSelectorTimeout(long timeout) {
        ((NioEndpoint)getEndpoint()).setSelectorTimeout(timeout);
    }

    public long getSelectorTimeout(a) {
        return ((NioEndpoint)getEndpoint()).getSelectorTimeout();
    }

    public void setPollerThreadPriority(int threadPriority) {
        ((NioEndpoint)getEndpoint()).setPollerThreadPriority(threadPriority);
    }

    public int getPollerThreadPriority(a) {
      return ((NioEndpoint)getEndpoint()).getPollerThreadPriority();
    }


    // ----------------------------------------------------- JMX related methods

    @Override
    protected String getNamePrefix(a) {
        if (isSSLEnabled()) {
            return "https-" + getSslImplementationShortName()+ "-nio";
        } else {
            return "http-nio"; }}}Copy the code

Interpretation of Endpoint

Here, the EndPoint is used to process connection and transfer data. It is used for network connection and control. It is the access point for external I/O operations on the server. The main task is to manage external socket connections and deliver the established socket connections to the appropriate worker threads. The two main attribute classes are Acceptor and Poller and SocketProcessor. Take NioEndpoint as an example. Its internal request processing process is as follows:

At the end of the previous section, we will focus on the implementation of Protocol’s lifecycle methods:

//org.apache.tomcat.util.net.AbstractEndpoint.java
public final void init(a) throws Exception {
    if (bindOnInit) {
        bindWithCleanup();
        bindState = BindState.BOUND_ON_INIT;
    }
    if (this.domain ! =null) {
        // Register endpoint (as ThreadPool - historical name)
        oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\" ");
        Registry.getRegistry(null.null).registerComponent(this, oname, null);

        ObjectName socketPropertiesOname = new ObjectName(domain +
                ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
        socketProperties.setObjectName(socketPropertiesOname);
        Registry.getRegistry(null.null).registerComponent(socketProperties, socketPropertiesOname, null);

        for(SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); }}}public final void start(a) throws Exception {
    if (bindState == BindState.UNBOUND) {
        bindWithCleanup();
        bindState = BindState.BOUND_ON_START;
    }
    startInternal();
}

//org.apache.tomcat.util.net.AbstractEndpoint.java
private void bindWithCleanup(a) throws Exception {
    try {
        bind();
    } catch (Throwable t) {
        // Ensure open sockets etc. are cleaned up if something goes
        // wrong during bind
        ExceptionUtils.handleThrowable(t);
        unbind();
        throwt; }}Copy the code

These two methods call bind(bindWithCleanup()) and startlntemal, which are template methods and can be implemented as required. Here, we refer to the implementation of NioEndpoint. The bind method code is as follows:

//org.apache.tomcat.util.net.NioEndpoint.java
@Override
public void bind(a) throws Exception {
    initServerSocket();

    // Initialize thread count defaults for acceptor, poller
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }
    setStopLatch(new CountDownLatch(pollerThreadCount));

    // Initialize SSL if needed
    initialiseSsl();

    selectorPool.open();
}
Copy the code

The bind method initializes the ServerSocket first. The bind method initializes the ServerSocket first. The bind method initializes the ServerSocket. The acceptorThreadCount and pollerThreadCount attributes, which represent the number of Acceptor and Poller initialized threads, are then checked with a value of at least 1.

// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket(a) throws Exception {
    if(! getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr =new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

Copy the code

Acceptors are used to accept requests and send them to pollers for processing, both of which are handled by the starting thread. In addition, initialization of SSL and other content. The startInternal method for NioEndpoint is as follows:

/**
* The socket pollers.
*/
private Poller[] pollers = null;

/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal(a) throws Exception {

    if(! running) { running =true;
        paused = false;

        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        // Create worker collection
        if ( getExecutor() == null ) {
            createExecutor();
        }

        initializeConnectionLatch();

        // Start poller threads
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); }}Copy the code

ProcessorCache is of type SynchronizedStack

. SocketProcessor is an internal class to NioEndpoint. When the Poller receives the request, it passes it to the SocketProcessor, which passes the request to the Handler. Pollers is an array that manages a bunch of runnables. If we don’t set it to one, that is, it is a single thread by default. Once created, this thread is set up as a daemon thread until the Tomcat container terminates, which will follow. Here, if we want to configure it, we can configure it in server.xml:

<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"  
               connectionTimeout="20000"  
               maxHeaderCount="64"  
               maxParameterCount="64"  
               maxHttpHeaderSize="8192"  
               URIEncoding="UTF-8"  
               useBodyEncodingForURI="false"  
               maxThreads="128"  
               minSpareThreads="12"  
               acceptCount="1024"  
               connectionLinger="1"  
               keepAliveTimeout="60"  
               maxKeepAliveRequests="32"  
               maxConnections="10000"  
               acceptorThreadCount="1"  
               pollerThreadCount="2"  
               selectorTimeout="1000"  
               useSendfile="true"  
               selectorPool.maxSelectors="128"  
               redirectPort="8443" />  
Copy the code

The startAcceptorThreads method for starting acceptors is AbstractEndpoint.

protected void startAcceptorThreads(a) {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = newThread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }}Copy the code

The getAcceptorThreadCount method is the acceptorThreadCount property of the init method, which starts the appropriate number of Acceptor threads to receive the request. The default is also 1, which creates threads in the same way as Poller, but without further ado.

Here, we look at the webapps/docs/config/HTTP XML documentation:

<attribute name="acceptorThreadCount" required="false">
    <p>The number of threads to be used to accept connections. Increase this
    value on a multi CPU machine, although you would never really need more
    than <code>2</code>. Also, with a lot of non keep alive connections, you
    might want to increase this value as well. Default value is
    <code>1</code>.</p>
</attribute>

<attribute name="pollerThreadCount" required="false">
    <p>(int)The number of threads to be used to run for the polling events.
    Default value is <code>1</code> per processor but not more than 2.<br/>
    When accepting a socket, the operating system holds a global lock. So the benefit of
    going above 2 threads diminishes rapidly. Having more than one thread is for
    system that need to accept connections very rapidly. However usually just
    increasing <code>acceptCount</code> will solve that problem.
    Increasing this value may also be beneficial when a large amount of send file
    operations are going on.
    </p>
</attribute>
Copy the code

AcceptorThreadCount is used to set the number of threads that accept connections. Increase this value on multi-CPU machines, although you probably don’t really need more than 2. You may want to increase this value even if there are many non-Keep Alive connections. The default value is 1. PollerThreadCount is the number of threads to run for polling events. The default value is 1 but no more than 2 per processor (2 in the optimized configuration above). When a socket is accepted, the operating system maintains a global lock. Therefore, the benefits of more than 2 threads quickly diminish. When a system has multiple threads of this type, it can accept connections very quickly. Increasing acceptCount solves this problem. Increasing this value may also be beneficial when you are sending a lot of files.

The way Acceptor and Poller work

Let’s start with a sequence diagram of NioEndpoint processing:

Acceptor mode of operation

Acceptors and Pollers implement Runnable interfaces, so the main work flow is within the run method.

//org.apache.tomcat.util.net.NioEndpoint.java
@Override
protected SocketChannel serverSocketAccept(a) throws Exception {
    return serverSock.accept();
}
//org.apache.tomcat.util.net.Acceptor.java
public class Acceptor<U> implements Runnable {

    private static final Log log = LogFactory.getLog(Acceptor.class);
    private static final StringManager sm = StringManager.getManager(Acceptor.class);

    private static final int INITIAL_ERROR_DELAY = 50;
    private static final int MAX_ERROR_DELAY = 1600;

    private finalAbstractEndpoint<? ,U> endpoint;private String threadName;
    protected volatile AcceptorState state = AcceptorState.NEW;


    public Acceptor(AbstractEndpoint
        endpoint) {
        this.endpoint = endpoint;
    }


    public final AcceptorState getState(a) {
        return state;
    }


    final void setThreadName(final String threadName) {
        this.threadName = threadName;
    }


    final String getThreadName(a) {
        return threadName;
    }


    @Override
    public void run(a) {

        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (endpoint.isRunning()) {

            // Loop if endpoint is paused
            while (endpoint.isPaused() && endpoint.isRunning()) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore}}if(! endpoint.isRunning()) {break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don't accept new connections
                if (endpoint.isPaused()) {
                    continue;
                }

                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // Create a socketChannel to receive the next connection from the server
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // We didn't get a socket
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break; }}// Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                // If the EndPoint is running and not paused
                if(endpoint.isRunning() && ! endpoint.isPaused()) {// setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else{ endpoint.destroySocket(socket); }}catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                String msg = sm.getString("endpoint.accept.fail");
                // APR specific.
                // Could push this down but not sure it is worth the trouble.
                if (t instanceof Error) {
                    Error e = (Error) t;
                    if (e.getError() == 233) {
                        // Not an error on HP-UX so log as a warning
                        // so it can be filtered out on that platform
                        // See bug 50273
                        log.warn(msg, t);
                    } else{ log.error(msg, t); }}else{ log.error(msg, t); } } } state = AcceptorState.ENDED; }...public enum AcceptorState {
        NEW, RUNNING, PAUSED, ENDED
    }
}
Copy the code

Acceptor uses a listener port blocked by serversock.accept (). If a connection comes in, the socket is received, and the EndPoint is running properly, SetSocketOptions calls NioEndPoint’s setSocketOptions method. In general, setSocketOptions builds a NioChannel based on the socket and registers the NioChannel in Poller’s event list. Waiting for Poller polling:

Org.apache.tomcat.util.net.NioEndpoint.java / * * * * Process to the specified connection. * the specified connection *@param socket The socket channel
* @return<code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately * If the socket is configured correctly and may continue processing, returns true * If the socket needs to be closed immediately, Return false */
@Override
protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
    // Take a nioChannel from the cache. If not, create one. Pass the socket in
        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = newNioChannel(socket, bufhandler); }}else {
            channel.setIOChannel(socket);
            channel.reset();
        }
    // Get a Poller object from the Pollers array and register the nioChannel
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error(sm.getString("endpoint.socketOptionsError"), t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

/**
* Return an available poller in true round robin fashion.
*
* @return The next poller in sequence
*/
public Poller getPoller0(a) {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}
Copy the code

With regard to getPoller0(), note that, by default, the pollers array contains only one element, as we know from the previous section. Let’s look at the Register method implemented by Poller in NioEndPoint. All it does is register a newly created socket with Poller.

/**
* Registers a newly created socket with the poller.
*
* @param socket    The newly created socket
*/
public void register(final NioChannel socket) {
    socket.setPoller(this);
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getConnectionTimeout());
    ka.setWriteTimeout(getConnectionTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    // Fetch a PollerEvent object from the cache, or create one if none exists. Set socket and NioSocketWrapper
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);
    // Add to the list of events to the Poller
    addEvent(r);
}
Copy the code

To summarize the above process:

When receiving a request from an Acceptor, it does the following:

  • If the maximum number of connections is reached, wait. Otherwise, the listening port is blocked.
  • If a connection is detected, a socketChannel is created. If the service is running properly, the socket is passed to the appropriate processor. If successful, the socket is closed.

Here, the setSocketOptions method of the NioEndPoint is called to handle the specified connection:

  • Set the socket to non-blocking
  • Get a nioChannel from the cache. If not, create one. Pass the socket in.
  • Get a Poller object from the Pollers array and register nioChannel with the Poller.

The last step in the registration process is to call Poller’s register() method:

  • Create a NioSocketWrapper wrapper around the socket. Then configure interestOps to selectionkey.op_read
  • Fetch a PollerEvent object from the cache, or create one if none exists. Initializing or resetting this Event object sets its interestOps to OP_REGISTER (used in Poller polling)
  • Add a new PollerEvent to the Poller’s events list and wait for the Poller thread to poll.

Poller’s way of working

Poller implements the Runnable interface and creates worker threads during this part of the start lifecycle, so let’s take a look at how it works using its run method.

In fact, the Poller registers an event to an event queue as mentioned above. The next thing the Poller thread has to do is actually handle these events.

Poller polls the events queue in the Run method, registers the interestOps of the SocketChannel in each PollerEvent with the Selector, and then removes the PollerEvent from the queue. SocketChanel then uses Selector scheduling to do non-blocking reads and writes.

 /** * Poller class. */
    public class Poller implements Runnable {

        private Selector selector;
        private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>();

        private volatile boolean close = false;
        private long nextExpiration = 0;//optimize expiration handling

        private AtomicLong wakeupCounter = new AtomicLong(0);

        private volatile int keyCount = 0;

        public Poller(a) throws IOException {
            this.selector = Selector.open();
        }

        public int getKeyCount(a) { return keyCount; }

        public Selector getSelector(a) { returnselector; }/** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */
        @Override
        public void run(a) {
            // Loop until destroy() is called
             // Loop until destroy() is called
            while (true) {

                boolean hasEvents = false;

                try {
                    if(! close) {// Iterate over events, registering the interestOps of the channels in each event with the Selector
                        hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            //if we are here, means we have other stuff to do
                            //do a non blocking select
                            // There is an IO Channel in place
                            // Call the non-blocking SELECT method and return the number of ready channels directly
                            keyCount = selector.selectNow();
                        } else {
                            // Blocking waits for the operating system to return a Channel with data ready, and then wakes up
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    if (close) {
                        events();
                        timeout(0.false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break; }}catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                //either we timed out or we woke up, process events first
                // If the select method above times out or is woken up, register the Channel in the Events queue with the Selector.
                if ( keyCount == 0 ) hasEvents = (hasEvents | events());

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                 // Iterate over the ready Channel and call processKey to handle the Socket's IO.
                while(iterator ! =null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    // Attachment may be empty if another thread has been called
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                    // Create a SocketProcessor to execute in the Tomcat thread poolprocessKey(sk, attachment); }}//while

                //process timeouts
                timeout(keyCount,hasEvents);
            }//whilegetStopLatch().countDown(); }... }Copy the code

The part of reading the ready Channel above is a very common Java NIO usage, that is, Selector calls selectedKeys(), gets the Channel where the IO data is ready, iterates through and calls processKey to handle each channel-ready event. The processKey method creates a SocketProcessor and throws it into the Tomcat thread pool for execution.

Another point to note here is that the events() method, which handles the PollerEvent event, executes pollerEvent.run (), and then puts the PollerEvent reset back into the cache for object reuse.

/**
* Processes events in the event queue of the Poller.
*
* @return <code>true</code> if some events were processed,
*   <code>false</code> if queue was empty
*/
public boolean events(a) {
    boolean result = false;

    PollerEvent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) ! =null; i++ ) {
        result = true;
        try {
            // Register SocketChannel interestOps with the Selector
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error(sm.getString("endpoint.nio.pollerEventError"), x); }}return result;
}
Copy the code

Therefore, the pollerEvent.run () method is the focus of our attention:

/**
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent implements Runnable {

    private NioChannel socket;
    private int interestOps;
    private NioSocketWrapper socketWrapper;

    public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
        reset(ch, w, intOps);
    }

    public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
        socket = ch;
        interestOps = intOps;
        socketWrapper = w;
    }

    public void reset(a) {
        reset(null.null.0);
    }

    @Override
    public void run(a) {
        // When an Acceptor calls poller.register (), the interestOps of a PollerEvent created is OP_REGISTER
        if (interestOps == OP_REGISTER) {
            try {
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x); }}else {
            final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
            try {
                if (key == null) {
                    // The key was cancelled (e.g. due to socket closure)
                    // and removed from the selector while it was being
                    // processed. Count down the connections at this point
                    // since it won't have been counted down when the socket
                    // closed.
                    socket.socketWrapper.getEndpoint().countDownConnection();
                    ((NioSocketWrapper) socket.socketWrapper).closed = true;
                } else {
                    final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                    if(socketWrapper ! =null) {
                        //we are registering the key to start with, reset the fairness counter.
                        int ops = key.interestOps() | interestOps;
                        socketWrapper.interestOps(ops);
                        key.interestOps(ops);
                    } else{ socket.getPoller().cancelledKey(key); }}}catch (CancelledKeyException ckx) {
                try {
                    socket.getPoller().cancelledKey(key);
                } catch (Exception ignore) {}
            }
        }
    }

    @Override
    public String toString(a) {
        return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper +
                "], interestOps [" + interestOps + "]"; }}Copy the code

At this point, you can see the Poller thread in action

  • Registers requests received by acceptors to Poller’s event queue
  • Poller polls the event queue, processes incoming events, and registers channels in pollerEvents with Poller’s Selector
  • Polling the ready channels, creating a SocketProcessor for each ready channel to be handled by the Tomcat thread pool

All that remains is how the SocketProcessor ADAPTS the requested data from the client and then hands it to the Servlet container for processing.

ProcessKey (SK, Attachment), the last call in Poller’s run method; :

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if( sk.isValid() && attachment ! =null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if( attachment.getSendfileData() ! =null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // Read goes before write
                    if (sk.isReadable()) {
                        if(! processSocket(attachment, SocketEvent.OPEN_READ,true)) {
                            closeSocket = true; }}if(! closeSocket && sk.isWritable()) {if(! processSocket(attachment, SocketEvent.OPEN_WRITE,true)) {
                            closeSocket = true; }}if(closeSocket) { cancelledKey(sk); }}}}else {
            //invalid keycancelledKey(sk); }}catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.nio.keyProcessingError"), t); }}Copy the code

SocketProcessor is used to process requests from the processSocket method:

/**
    * Process the given SocketWrapper with the given status. Used to trigger
    * processing as if the Poller (for those endpoints that have one)
    * selected the socket.
    *
    * @param socketWrapper The socket wrapper to process
    * @param event         The socket event to be processed
    * @param dispatch      Should the processing be performed on a new
    *                          container thread
    *
    * @return if processing was triggered successfully
    */
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if(dispatch && executor ! =null) {
            executor.execute(sc);
        } else{ sc.run(); }}catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}
Copy the code

SocketProcessor processes the request

Here’s a quick mention of SocketProcessor processing to help you connect to the Servlet container. As can be seen from the above, a specific request is processed in the SocketProcessor through the thread pool. Here, we look at the sequence diagram of its execution of a request:

As you can see in the figure, the SocketProcessor gets the Htpp11Processor through the Http11ConnectionHandler, which then calls the prepareRequest method to prepare the request data. Then the CoyoteAdapter service method is called for request and response adaptation, which is handed over to the Tomcat container for processing.

The following process is represented by a series of calls:

connector.getService().getContainer().getPipeline().getFirst().invoke(request,response);

Here we first get the Service from the Connector (the Connector has already set itself into the CoyoteAdapter when creating it in the initInternal method). It then gets the Container from the Service, then the pipe, then the first Value of the pipe, and finally calls the Invoke method to execute the request. Service stores the top-level container. When invoking the invoke method of the top-level container pipe, the pipe will invoke the invoke method of Value in the pipe of each layer. Until finally BaseValue (StandardWrapperValve) in the Wrapper pipe is called to process the Filter and Servlet.

After handing the request to the Tomcat container for processing, we pass the request layer by layer to Engine, Host, Context, Wrapper, and finally through a series of filters to the Servlet, where we execute our own code logic.

So far, some things about Connector are almost covered. The rest, if you have energy in the future, continue to explore, and then share the interpretation of Webflux.

Add: Thank you for your question (blog :www.jiangxinlingdu.com), here I will add some additional understanding of my own questions:

Here’s a bit more detail about initServerSocket(), which implements the lifecycle part of NioEndpoint:

// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket(a) throws Exception {
    if(! getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr =new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}
Copy the code

Why doesn’t Tomcat set this to non-blocking? Set the initialization is just at the moment, as the blocking state, blocking is blocked on this thread, namely the Acceptor within a thread to perform its run method, invoked the endpoint. ServerSocketAccept () to create a socketChannel, Receives the next connection coming in from the server. When received successfully, to this socket is configured, which is called the endpoint. SetSocketOptions (socket), in this way, will call socket. ConfigureBlocking (false); In this case, SocketChannel is turned on in non-blocking mode. Please refer to the previous details in this article for the code.