The reason
It was also due to my carelessness. When I wrote Java programming methodology — Reactor and Webflux, I thought Tomcat had a good blog about connector, so I briefly referred to it without in-depth analysis of the source code. As a result, WHEN I recorded and shared the video, Found that their article content and source code is not consistent, and through the search engine to search some Chinese blog articles, is not satisfactory, simply, their own through the latest source code to comb part of the content of Tomcat connector, but also to give yourself a warning, everything must be careful and careful again! Reference source code address: github.com/apache/tomc…
About THE Java programming methodology — Reactor and Webflux video sharing, has completed Rxjava and Reactor, site B address is as follows:
Rxjava source code reading and sharing: www.bilibili.com/video/av345…
Reactor source code reading and sharing: www.bilibili.com/video/av353…
Detailed description of Tomcat startup process
Start and end Basic Tomcat operations
On Linux, you can start and stop Tomcat using commands.
Go to the bin directory of Tomcat:
cd /java/tomcat/bin
Copy the code
Start Tomcat command:
./startup.sh
Copy the code
Command for stopping the Tomcat service:
./shutdown.sh
Copy the code
After implementation of tomcat. / shutdown. Sh, although the tomcat services cannot be normal visit, but the ps – ef | grep tomcat, found the tomcat corresponding Java process was not destroyed, according to the web container closed and then there is zombie Java process. The JVM will not exit if all threads in the JVM are daemons. The JVM does not exit if there is one or more non-daemons remaining). Run the following command to check whether the Tomcat process is complete:
ps -ef|grep tomcat
Copy the code
If there are user threads, kill them with kill -9 PID
Detailed explanation of Bootstrap startup process
Elif [“$1” = “start”]; elif [“$1” = “start”]; Place, we go down, can find that it calls the org. Apache. Catalina. Startup. The Bootstrap. Java class – this class under the start () method:
/**
* org.apache.catalina.startup.Bootstrap
* Start the Catalina daemon.
* @throws Exception Fatal start error
*/
public void start(a)
throws Exception {
if( catalinaDaemon==null ) init();
Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null);
method.invoke(catalinaDaemon, (Object [])null);
}
Copy the code
Here, the first time you start the server, will call its init (), it is mainly used to create org. Apache. Catalina. Startup. Catalina. Java class instances:
/**
* org.apache.catalina.startup.Bootstrap
* Initialize daemon.
* @throws Exception Fatal initialization error
*/
public void init(a) throws Exception {
initClassLoaders();
Thread.currentThread().setContextClassLoader(catalinaLoader);
SecurityClassLoad.securityClassLoad(catalinaLoader);
// Load our startup class and call its process() method
if (log.isDebugEnabled())
log.debug("Loading startup class"); Class<? > startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
Object startupInstance = startupClass.getConstructor().newInstance();
// Set the shared extensions class loader
if (log.isDebugEnabled())
log.debug("Setting startup class properties");
String methodName = "setParentClassLoader"; Class<? > paramTypes[] =new Class[1];
paramTypes[0] = Class.forName("java.lang.ClassLoader");
Object paramValues[] = new Object[1];
paramValues[0] = sharedLoader;
Method method =
startupInstance.getClass().getMethod(methodName, paramTypes);
method.invoke(startupInstance, paramValues);
catalinaDaemon = startupInstance;
}
Copy the code
The startup process Catalina in detail
Start in Catalina
Next, Bootstrap’s start() method calls the Catalina instance’s start method:
/**
* org.apache.catalina.startup.Catalina
* Start a new server instance.
*/
public void start(a) {
if (getServer() == null) {
load();
}
if (getServer() == null) {
log.fatal(sm.getString("catalina.noServer"));
return;
}
long t1 = System.nanoTime();
// Start the new server
try {
getServer().start();
} catch (LifecycleException e) {
log.fatal(sm.getString("catalina.serverStartFail"), e);
try {
getServer().destroy();
} catch (LifecycleException e1) {
log.debug("destroy() failed for failed Server ", e1);
}
return;
}
long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info(sm.getString("catalina.startup", Long.valueOf((t2 - t1) / 1000000)));
}
// Register shutdown hook
if (useShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new CatalinaShutdownHook();
}
Runtime.getRuntime().addShutdownHook(shutdownHook);
// If JULI is being used, disable JULI's shutdown hook since
// shutdown hooks run in parallel and log messages may be lost
// if JULI's hook completes before the CatalinaShutdownHook()
LogManager logManager = LogManager.getLogManager();
if (logManager instanceof ClassLoaderLogManager) {
((ClassLoaderLogManager) logManager).setUseShutdownHook(
false); }}if(await) { await(); stop(); }}Copy the code
Load (), getServer().start(), getServer().start(), getServer(), getServer().start(), getServer().
Load () in Catalina
Load () will create and configure the Digester we will use to start with createStartDigester(), then get the ServerXml file we configured, configure the properties in turn, and finally call getServer().init():
/**
* org.apache.catalina.startup.Catalina
* Start a new server instance.
*/
public void load(a) {
if (loaded) {
return;
}
loaded = true;
long t1 = System.nanoTime();
initDirs();
// Before digester - it may be needed
initNaming();
// Set configuration source
ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(Bootstrap.getCatalinaBaseFile(), getConfigFile()));
File file = configFile();
// Create and execute our Digester
Digester digester = createStartDigester();
try (ConfigurationSource.Resource resource = ConfigFileLoader.getSource().getServerXml()) {
InputStream inputStream = resource.getInputStream();
InputSource inputSource = new InputSource(resource.getURI().toURL().toString());
inputSource.setByteStream(inputStream);
digester.push(this);
digester.parse(inputSource);
} catch (Exception e) {
if (file == null) {
log.warn(sm.getString("catalina.configFail", getConfigFile() + "] or [server-embed.xml"), e);
} else {
log.warn(sm.getString("catalina.configFail", file.getAbsolutePath()), e);
if(file.exists() && ! file.canRead()) { log.warn(sm.getString("catalina.incorrectPermissions")); }}return;
}
getServer().setCatalina(this);
getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());
// Stream redirection
initStreams();
// Start the new server
try {
getServer().init();
} catch (LifecycleException e) {
if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
throw new java.lang.Error(e);
} else {
log.error(sm.getString("catalina.initError"), e); }}long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info(sm.getString("catalina.init", Long.valueOf((t2 - t1) / 1000000))); }}Copy the code
Here, where the server comes from, we from the digester. AddObjectCreate (” server “and” org. Apache. Catalina. Core. StandardServer “, “className”); As you can see, it uses an instance of this class. Let’s go back to Digester.push (this); digester.parse(inputSource); Digester.push(this) is called before parsing. The top element of the stack is Catalina, which is used to set up the server for Catalina.
Parsing to
creates an instance of the StandardServer class and reflects the setter method that calls Digester’s top stack object (the method called is determined by the name value passed in). Digester involving IntrospectionUtils. SetProperty (top, name, value) method, namely the top for the stack object, the name for the stack object to set the property name, the value to set the attribute values. We started with Catalina at the top of the stack, so we called catalina.setServer (Server object) to set the Server up for a later call to server.start (). The StandardServer object instance is then placed in Digester’s Stack of Stack objects.
getServer().init()
Next, let’s look at getServer (). The init (), by the knowledge, we find org. Apache. Catalina. Core. StandardServer. Java class – this class, its inheritance LifecycleMBeanBase and implements the Server, LifecycleMBeanBase class: LifecycleMBeanBase class: LifecycleMBeanBase
//org.apache.catalina.util.LifecycleBase.java
@Override
public final synchronized void init(a) throws LifecycleException {
if(! state.equals(LifecycleState.NEW)) { invalidTransition(Lifecycle.BEFORE_INIT_EVENT); }try {
setStateInternal(LifecycleState.INITIALIZING, null.false);
initInternal();
setStateInternal(LifecycleState.INITIALIZED, null.false);
} catch (Throwable t) {
handleSubClassException(t, "lifecycleBase.initFail", toString()); }}Copy the code
The implementation of initInternal() in StandardServer is too much code. 1, call the superclass org. Apache. Catalina. Util. LifecycleMBeanBase# initInternal method, an registration
2. Register mBeans for other attributes of the class
3, NamingResources initialization: globalNamingResources init ();
4, start from common ClassLoader to view up to SystemClassLoader, traverse each ClassLoader corresponding view path, find the jar end file, read the Manifest information, Join the ExtensionValidator# containerManifestResources attribute.
5, initialize service, default implementation is StandardService.
I) call the super.initinternal () method
Ii) initialize the container, where the container instance is StandardEngine. Iii) Executor initialization iv)Connector initialization: A) org. Apache. Catalina. Connector. The connector connector HTTP / 1.1-8080 – b) org. Apache. Catalina. Connector. The connector The Connector AJP / 1.3 8009
GetServer ().start() in Catalina
Here, we can see the parent class of StandardServer org. Apache. Catalina. Util. LifecycleBase. Java implementation:
@Override
public final synchronized void start(a) throws LifecycleException {
if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) ||
LifecycleState.STARTED.equals(state)) {
if (log.isDebugEnabled()) {
Exception e = new LifecycleException();
log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e);
} else if (log.isInfoEnabled()) {
log.info(sm.getString("lifecycleBase.alreadyStarted", toString()));
}
return;
}
if (state.equals(LifecycleState.NEW)) {
init();
} else if (state.equals(LifecycleState.FAILED)) {
stop();
} else if(! state.equals(LifecycleState.INITIALIZED) && ! state.equals(LifecycleState.STOPPED)) { invalidTransition(Lifecycle.BEFORE_START_EVENT); }try {
setStateInternal(LifecycleState.STARTING_PREP, null.false);
startInternal();
if (state.equals(LifecycleState.FAILED)) {
// This is a 'controlled' failure. The component put itself into the
// FAILED state so call stop() to complete the clean-up.
stop();
} else if(! state.equals(LifecycleState.STARTING)) {// Shouldn't be necessary but acts as a check that sub-classes are
// doing what they are supposed to.
invalidTransition(Lifecycle.AFTER_START_EVENT);
} else {
setStateInternal(LifecycleState.STARTED, null.false); }}catch (Throwable t) {
// This is an 'uncontrolled' failure so put the component into the
// FAILED state and throw an exception.
handleSubClassException(t, "lifecycleBase.startFail", toString()); }}Copy the code
For StandardServer, we focus on startInternal(); 1, trigger the CONFIGURE_START_EVENT event.
2. Set the state of this object to STARTING
3, start NameingResource: globalNamingResources. Start (); 4. StandardService starts.
I) Set the state to STARTING
Ii) Container startup, i.e. StandardEngine startup III) Executor startup iv) Connector startup: A) org. Apache. Catalina. Connector. The connector connector HTTP / 1.1-8080 – b) org. Apache. Catalina. Connector. The connector The Connector AJP / 1.3 8009
Finally, we have explored my protagonist Connector.
The Connector interpretation
The Connector constructor
The default configuration protocol for port 8080 is HTTP/1.1, as shown in Connector server. XML in apache-tomcat-9.0.14\conf.
<Connector port="8080" protocol="HTTP / 1.1"
connectionTimeout="20000"
redirectPort="8443" />
<! -- Define an AJP 1.3 Connector on port 8009 -->
<Connector port="8009" protocol="AJP / 1.3" redirectPort="8443" />
Copy the code
With that in mind, let’s look at the implementation in its code:
public Connector(a) {
this("org.apache.coyote.http11.Http11NioProtocol");
}
public Connector(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
if ("HTTP / 1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol"; }}else if ("AJP / 1.3".equals(protocol)) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol"; }}else {
protocolHandlerClassName = protocol;
}
// Instantiate protocol handler
ProtocolHandler p = null;
try{ Class<? > clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); }catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}
// Default for Connector depends on this system property
setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}
Copy the code
For tomcat8.5 and above, the default is Http11NioProtocol. Here, we set it to HTTP/1.1, but according to the above statement, it is equal to Http11NioProtocol.
Connector initialization and startup
Also, we know from the previous section that Connector initialization is involved, which also extends LifecycleMBeanBase, so let’s look at its related initInternal() implementation:
@Override
protected void initInternal(a) throws LifecycleException {
super.initInternal();
if (protocolHandler == null) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
}
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
if(service ! =null) {
protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
}
// Make sure parseBodyMethodsSet has a default
if (null == parseBodyMethodsSet) {
setParseBodyMethods(getParseBodyMethods());
}
if(protocolHandler.isAprRequired() && ! AprLifecycleListener.isAprAvailable()) {throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
protocolHandler instanceofAbstractHttp11JsseProtocol) { AbstractHttp11JsseProtocol<? > jsseProtocolHandler = (AbstractHttp11JsseProtocol<? >) protocolHandler;if (jsseProtocolHandler.isSSLEnabled() &&
jsseProtocolHandler.getSslImplementationName() == null) {
// OpenSSL is compatible with the JSSE configuration, so use it if APR is availablejsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName()); }}try {
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e); }}Copy the code
The process involved is as follows: 1. Register the MBean
Instantiate the CoyoteAdapter, which is the entry point to the request. When there is a request, the CoyoteAdapter handles the state, the request is reclaimed at the end, and the intermediate process is handled by pipeline.
3, protocolHandler initialization (org. Apache. Coyote. Http11. Http11Protocol)
In this step, the endpoint is initialized
Not to mention STARTING, which sets the state of this object to STARTING and calls protocolHandler.start(); Next, it’s time to get into our core rhythm.
@Override
protected void startInternal(a) throws LifecycleException {
// Validate settings before starting
if (getPortWithOffset() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e); }}Copy the code
Protocol interpretation
Here, we directly from the abstract implementation org. Apache. Coyote. AbstractProtocol. The Java perspective, it is to follow the life cycle, so it will inherit LifecycleMBeanBase and realizes own init () and the start () method, such as life cycle The specific logic is executed internally by the corresponding self-implementing endpoint:
//org.apache.coyote.AbstractProtocol.java
@Override
public void init(a) throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
logPortOffset();
}
if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if(oname ! =null) {
Registry.getRegistry(null.null).registerComponent(this, oname, null); }}if (this.domain ! =null) {
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null.null).registerComponent(
getHandler().getGlobal(), rgOname, null);
}
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
endpoint.init();
}
@Override
public void start(a) throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
logPortOffset();
}
endpoint.start();
monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
new Runnable() {
@Override
public void run(a) {
if(! isPaused()) { startAsyncTimeout(); }}},0.60, TimeUnit.SECONDS);
}
Copy the code
Take org. Apache. Coyote. Http11. Http11NioProtocol this class, it receives is NioEndpoint for the realization of the constructor, the concrete implementation method of its internal had thus NioEndpoint to implement the logic:
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
private static final Log log = LogFactory.getLog(Http11NioProtocol.class);
public Http11NioProtocol(a) {
super(new NioEndpoint());
}
@Override
protected Log getLog(a) { return log; }
// -------------------- Pool setup --------------------
public void setPollerThreadCount(int count) {
((NioEndpoint)getEndpoint()).setPollerThreadCount(count);
}
public int getPollerThreadCount(a) {
return ((NioEndpoint)getEndpoint()).getPollerThreadCount();
}
public void setSelectorTimeout(long timeout) {
((NioEndpoint)getEndpoint()).setSelectorTimeout(timeout);
}
public long getSelectorTimeout(a) {
return ((NioEndpoint)getEndpoint()).getSelectorTimeout();
}
public void setPollerThreadPriority(int threadPriority) {
((NioEndpoint)getEndpoint()).setPollerThreadPriority(threadPriority);
}
public int getPollerThreadPriority(a) {
return ((NioEndpoint)getEndpoint()).getPollerThreadPriority();
}
// ----------------------------------------------------- JMX related methods
@Override
protected String getNamePrefix(a) {
if (isSSLEnabled()) {
return "https-" + getSslImplementationShortName()+ "-nio";
} else {
return "http-nio"; }}}Copy the code
Interpretation of Endpoint
Here, the EndPoint is used to process connection and transfer data. It is used for network connection and control. It is the access point for external I/O operations on the server. The main task is to manage external socket connections and deliver the established socket connections to the appropriate worker threads. The two main attribute classes are Acceptor and Poller and SocketProcessor. Take NioEndpoint as an example. Its internal request processing process is as follows:
At the end of the previous section, we will focus on the implementation of Protocol’s lifecycle methods:
//org.apache.tomcat.util.net.AbstractEndpoint.java
public final void init(a) throws Exception {
if (bindOnInit) {
bindWithCleanup();
bindState = BindState.BOUND_ON_INIT;
}
if (this.domain ! =null) {
// Register endpoint (as ThreadPool - historical name)
oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\" ");
Registry.getRegistry(null.null).registerComponent(this, oname, null);
ObjectName socketPropertiesOname = new ObjectName(domain +
":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
socketProperties.setObjectName(socketPropertiesOname);
Registry.getRegistry(null.null).registerComponent(socketProperties, socketPropertiesOname, null);
for(SSLHostConfig sslHostConfig : findSslHostConfigs()) { registerJmx(sslHostConfig); }}}public final void start(a) throws Exception {
if (bindState == BindState.UNBOUND) {
bindWithCleanup();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
//org.apache.tomcat.util.net.AbstractEndpoint.java
private void bindWithCleanup(a) throws Exception {
try {
bind();
} catch (Throwable t) {
// Ensure open sockets etc. are cleaned up if something goes
// wrong during bind
ExceptionUtils.handleThrowable(t);
unbind();
throwt; }}Copy the code
These two methods call bind(bindWithCleanup()) and startlntemal, which are template methods and can be implemented as required. Here, we refer to the implementation of NioEndpoint. The bind method code is as follows:
//org.apache.tomcat.util.net.NioEndpoint.java
@Override
public void bind(a) throws Exception {
initServerSocket();
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
selectorPool.open();
}
Copy the code
The bind method initializes the ServerSocket first. The bind method initializes the ServerSocket first. The bind method initializes the ServerSocket. The acceptorThreadCount and pollerThreadCount attributes, which represent the number of Acceptor and Poller initialized threads, are then checked with a value of at least 1.
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket(a) throws Exception {
if(! getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr =new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
Copy the code
Acceptors are used to accept requests and send them to pollers for processing, both of which are handled by the starting thread. In addition, initialization of SSL and other content. The startInternal method for NioEndpoint is as follows:
/**
* The socket pollers.
*/
private Poller[] pollers = null;
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal(a) throws Exception {
if(! running) { running =true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); }}Copy the code
ProcessorCache is of type SynchronizedStack
. SocketProcessor is an internal class to NioEndpoint. When the Poller receives the request, it passes it to the SocketProcessor, which passes the request to the Handler. Pollers is an array that manages a bunch of runnables. If we don’t set it to one, that is, it is a single thread by default. Once created, this thread is set up as a daemon thread until the Tomcat container terminates, which will follow. Here, if we want to configure it, we can configure it in server.xml:
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
connectionTimeout="20000"
maxHeaderCount="64"
maxParameterCount="64"
maxHttpHeaderSize="8192"
URIEncoding="UTF-8"
useBodyEncodingForURI="false"
maxThreads="128"
minSpareThreads="12"
acceptCount="1024"
connectionLinger="1"
keepAliveTimeout="60"
maxKeepAliveRequests="32"
maxConnections="10000"
acceptorThreadCount="1"
pollerThreadCount="2"
selectorTimeout="1000"
useSendfile="true"
selectorPool.maxSelectors="128"
redirectPort="8443" />
Copy the code
The startAcceptorThreads method for starting acceptors is AbstractEndpoint.
protected void startAcceptorThreads(a) {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = newThread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }}Copy the code
The getAcceptorThreadCount method is the acceptorThreadCount property of the init method, which starts the appropriate number of Acceptor threads to receive the request. The default is also 1, which creates threads in the same way as Poller, but without further ado.
Here, we look at the webapps/docs/config/HTTP XML documentation:
<attribute name="acceptorThreadCount" required="false">
<p>The number of threads to be used to accept connections. Increase this
value on a multi CPU machine, although you would never really need more
than <code>2</code>. Also, with a lot of non keep alive connections, you
might want to increase this value as well. Default value is
<code>1</code>.</p>
</attribute>
<attribute name="pollerThreadCount" required="false">
<p>(int)The number of threads to be used to run for the polling events.
Default value is <code>1</code> per processor but not more than 2.<br/>
When accepting a socket, the operating system holds a global lock. So the benefit of
going above 2 threads diminishes rapidly. Having more than one thread is for
system that need to accept connections very rapidly. However usually just
increasing <code>acceptCount</code> will solve that problem.
Increasing this value may also be beneficial when a large amount of send file
operations are going on.
</p>
</attribute>
Copy the code
AcceptorThreadCount is used to set the number of threads that accept connections. Increase this value on multi-CPU machines, although you probably don’t really need more than 2. You may want to increase this value even if there are many non-Keep Alive connections. The default value is 1. PollerThreadCount is the number of threads to run for polling events. The default value is 1 but no more than 2 per processor (2 in the optimized configuration above). When a socket is accepted, the operating system maintains a global lock. Therefore, the benefits of more than 2 threads quickly diminish. When a system has multiple threads of this type, it can accept connections very quickly. Increasing acceptCount solves this problem. Increasing this value may also be beneficial when you are sending a lot of files.
The way Acceptor and Poller work
Let’s start with a sequence diagram of NioEndpoint processing:
Acceptor mode of operation
Acceptors and Pollers implement Runnable interfaces, so the main work flow is within the run method.
//org.apache.tomcat.util.net.NioEndpoint.java
@Override
protected SocketChannel serverSocketAccept(a) throws Exception {
return serverSock.accept();
}
//org.apache.tomcat.util.net.Acceptor.java
public class Acceptor<U> implements Runnable {
private static final Log log = LogFactory.getLog(Acceptor.class);
private static final StringManager sm = StringManager.getManager(Acceptor.class);
private static final int INITIAL_ERROR_DELAY = 50;
private static final int MAX_ERROR_DELAY = 1600;
private finalAbstractEndpoint<? ,U> endpoint;private String threadName;
protected volatile AcceptorState state = AcceptorState.NEW;
public Acceptor(AbstractEndpoint
endpoint) {
this.endpoint = endpoint;
}
public final AcceptorState getState(a) {
return state;
}
final void setThreadName(final String threadName) {
this.threadName = threadName;
}
final String getThreadName(a) {
return threadName;
}
@Override
public void run(a) {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
// Loop if endpoint is paused
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore}}if(! endpoint.isRunning()) {break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// Accept the next incoming connection from the server
// socket
// Create a socketChannel to receive the next connection from the server
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break; }}// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
// If the EndPoint is running and not paused
if(endpoint.isRunning() && ! endpoint.isPaused()) {// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else{ endpoint.destroySocket(socket); }}catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else{ log.error(msg, t); }}else{ log.error(msg, t); } } } state = AcceptorState.ENDED; }...public enum AcceptorState {
NEW, RUNNING, PAUSED, ENDED
}
}
Copy the code
Acceptor uses a listener port blocked by serversock.accept (). If a connection comes in, the socket is received, and the EndPoint is running properly, SetSocketOptions calls NioEndPoint’s setSocketOptions method. In general, setSocketOptions builds a NioChannel based on the socket and registers the NioChannel in Poller’s event list. Waiting for Poller polling:
Org.apache.tomcat.util.net.NioEndpoint.java / * * * * Process to the specified connection. * the specified connection *@param socket The socket channel
* @return<code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately * If the socket is configured correctly and may continue processing, returns true * If the socket needs to be closed immediately, Return false */
@Override
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
// Take a nioChannel from the cache. If not, create one. Pass the socket in
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = newNioChannel(socket, bufhandler); }}else {
channel.setIOChannel(socket);
channel.reset();
}
// Get a Poller object from the Pollers array and register the nioChannel
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
/**
* Return an available poller in true round robin fashion.
*
* @return The next poller in sequence
*/
public Poller getPoller0(a) {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
Copy the code
With regard to getPoller0(), note that, by default, the pollers array contains only one element, as we know from the previous section. Let’s look at the Register method implemented by Poller in NioEndPoint. All it does is register a newly created socket with Poller.
/**
* Registers a newly created socket with the poller.
*
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
// Fetch a PollerEvent object from the cache, or create one if none exists. Set socket and NioSocketWrapper
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// Add to the list of events to the Poller
addEvent(r);
}
Copy the code
To summarize the above process:
When receiving a request from an Acceptor, it does the following:
- If the maximum number of connections is reached, wait. Otherwise, the listening port is blocked.
- If a connection is detected, a socketChannel is created. If the service is running properly, the socket is passed to the appropriate processor. If successful, the socket is closed.
Here, the setSocketOptions method of the NioEndPoint is called to handle the specified connection:
- Set the socket to non-blocking
- Get a nioChannel from the cache. If not, create one. Pass the socket in.
- Get a Poller object from the Pollers array and register nioChannel with the Poller.
The last step in the registration process is to call Poller’s register() method:
- Create a NioSocketWrapper wrapper around the socket. Then configure interestOps to selectionkey.op_read
- Fetch a PollerEvent object from the cache, or create one if none exists. Initializing or resetting this Event object sets its interestOps to OP_REGISTER (used in Poller polling)
- Add a new PollerEvent to the Poller’s events list and wait for the Poller thread to poll.
Poller’s way of working
Poller implements the Runnable interface and creates worker threads during this part of the start lifecycle, so let’s take a look at how it works using its run method.
In fact, the Poller registers an event to an event queue as mentioned above. The next thing the Poller thread has to do is actually handle these events.
Poller polls the events queue in the Run method, registers the interestOps of the SocketChannel in each PollerEvent with the Selector, and then removes the PollerEvent from the queue. SocketChanel then uses Selector scheduling to do non-blocking reads and writes.
/** * Poller class. */
public class Poller implements Runnable {
private Selector selector;
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>();
private volatile boolean close = false;
private long nextExpiration = 0;//optimize expiration handling
private AtomicLong wakeupCounter = new AtomicLong(0);
private volatile int keyCount = 0;
public Poller(a) throws IOException {
this.selector = Selector.open();
}
public int getKeyCount(a) { return keyCount; }
public Selector getSelector(a) { returnselector; }/** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */
@Override
public void run(a) {
// Loop until destroy() is called
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if(! close) {// Iterate over events, registering the interestOps of the channels in each event with the Selector
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
// There is an IO Channel in place
// Call the non-blocking SELECT method and return the number of ready channels directly
keyCount = selector.selectNow();
} else {
// Blocking waits for the operating system to return a Channel with data ready, and then wakes up
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0.false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break; }}catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
//either we timed out or we woke up, process events first
// If the select method above times out or is woken up, register the Channel in the Events queue with the Selector.
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
// Iterate over the ready Channel and call processKey to handle the Socket's IO.
while(iterator ! =null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
// Attachment may be empty if another thread has been called
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
// Create a SocketProcessor to execute in the Tomcat thread poolprocessKey(sk, attachment); }}//while
//process timeouts
timeout(keyCount,hasEvents);
}//whilegetStopLatch().countDown(); }... }Copy the code
The part of reading the ready Channel above is a very common Java NIO usage, that is, Selector calls selectedKeys(), gets the Channel where the IO data is ready, iterates through and calls processKey to handle each channel-ready event. The processKey method creates a SocketProcessor and throws it into the Tomcat thread pool for execution.
Another point to note here is that the events() method, which handles the PollerEvent event, executes pollerEvent.run (), and then puts the PollerEvent reset back into the cache for object reuse.
/**
* Processes events in the event queue of the Poller.
*
* @return <code>true</code> if some events were processed,
* <code>false</code> if queue was empty
*/
public boolean events(a) {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) ! =null; i++ ) {
result = true;
try {
// Register SocketChannel interestOps with the Selector
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error(sm.getString("endpoint.nio.pollerEventError"), x); }}return result;
}
Copy the code
Therefore, the pollerEvent.run () method is the focus of our attention:
/**
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent implements Runnable {
private NioChannel socket;
private int interestOps;
private NioSocketWrapper socketWrapper;
public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
reset(ch, w, intOps);
}
public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
socket = ch;
interestOps = intOps;
socketWrapper = w;
}
public void reset(a) {
reset(null.null.0);
}
@Override
public void run(a) {
// When an Acceptor calls poller.register (), the interestOps of a PollerEvent created is OP_REGISTER
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x); }}else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if(socketWrapper ! =null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else{ socket.getPoller().cancelledKey(key); }}}catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
@Override
public String toString(a) {
return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper +
"], interestOps [" + interestOps + "]"; }}Copy the code
At this point, you can see the Poller thread in action
- Registers requests received by acceptors to Poller’s event queue
- Poller polls the event queue, processes incoming events, and registers channels in pollerEvents with Poller’s Selector
- Polling the ready channels, creating a SocketProcessor for each ready channel to be handled by the Tomcat thread pool
All that remains is how the SocketProcessor ADAPTS the requested data from the client and then hands it to the Servlet container for processing.
ProcessKey (SK, Attachment), the last call in Poller’s run method; :
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if( sk.isValid() && attachment ! =null ) {
if (sk.isReadable() || sk.isWritable() ) {
if( attachment.getSendfileData() ! =null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if(! processSocket(attachment, SocketEvent.OPEN_READ,true)) {
closeSocket = true; }}if(! closeSocket && sk.isWritable()) {if(! processSocket(attachment, SocketEvent.OPEN_WRITE,true)) {
closeSocket = true; }}if(closeSocket) { cancelledKey(sk); }}}}else {
//invalid keycancelledKey(sk); }}catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t); }}Copy the code
SocketProcessor is used to process requests from the processSocket method:
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if(dispatch && executor ! =null) {
executor.execute(sc);
} else{ sc.run(); }}catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
Copy the code
SocketProcessor processes the request
Here’s a quick mention of SocketProcessor processing to help you connect to the Servlet container. As can be seen from the above, a specific request is processed in the SocketProcessor through the thread pool. Here, we look at the sequence diagram of its execution of a request:
As you can see in the figure, the SocketProcessor gets the Htpp11Processor through the Http11ConnectionHandler, which then calls the prepareRequest method to prepare the request data. Then the CoyoteAdapter service method is called for request and response adaptation, which is handed over to the Tomcat container for processing.
The following process is represented by a series of calls:
connector.getService().getContainer().getPipeline().getFirst().invoke(request,response);
Here we first get the Service from the Connector (the Connector has already set itself into the CoyoteAdapter when creating it in the initInternal method). It then gets the Container from the Service, then the pipe, then the first Value of the pipe, and finally calls the Invoke method to execute the request. Service stores the top-level container. When invoking the invoke method of the top-level container pipe, the pipe will invoke the invoke method of Value in the pipe of each layer. Until finally BaseValue (StandardWrapperValve) in the Wrapper pipe is called to process the Filter and Servlet.
After handing the request to the Tomcat container for processing, we pass the request layer by layer to Engine, Host, Context, Wrapper, and finally through a series of filters to the Servlet, where we execute our own code logic.
So far, some things about Connector are almost covered. The rest, if you have energy in the future, continue to explore, and then share the interpretation of Webflux.
Add: Thank you for your question (blog :www.jiangxinlingdu.com), here I will add some additional understanding of my own questions:
Here’s a bit more detail about initServerSocket(), which implements the lifecycle part of NioEndpoint:
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket(a) throws Exception {
if(! getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr =new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
Copy the code
Why doesn’t Tomcat set this to non-blocking? Set the initialization is just at the moment, as the blocking state, blocking is blocked on this thread, namely the Acceptor within a thread to perform its run method, invoked the endpoint. ServerSocketAccept () to create a socketChannel, Receives the next connection coming in from the server. When received successfully, to this socket is configured, which is called the endpoint. SetSocketOptions (socket), in this way, will call socket. ConfigureBlocking (false); In this case, SocketChannel is turned on in non-blocking mode. Please refer to the previous details in this article for the code.