The AbstractProcessorLight core logic is as follows:
public SocketState process(SocketWrapperBase<? > socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches ! = null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (state ! = SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } } if (dispatches == null || ! dispatches.hasNext()) { dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches ! = null && state ! = SocketState.CLOSED); return state; }Copy the code
-
The core method of this class is process(),
-
Asynchronous processing is used to make different calls, returning the expected SocketState state, which we cover only for non-asynchronous normal calls.
-
For non-asynchronous normal calls, SocketEvent is OPEN_READ and goes into the service() method of the Http11Processor instance.
-
For other unknown SocketEvent events, the SocketState returned to the ConnectionHandler instance is CLOSED. This result is given to the SocketProcessor to close the original socket.
The Http11Processor**** core code is as follows:
public AbstractProcessor(Adapter adapter) {
this(adapter, new Request(), new Response());}
//Http11Processor
public Http11Processor(AbstractHttp11Protocol<?> protocol, Adapter adapter) {
super(adapter);
this.protocol = protocol;
httpParser = new HttpParser(protocol.getRelaxedPathChars(),
protocol.getRelaxedQueryChars());
inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),
protocol.getRejectIllegalHeaderName(), httpParser);
request.setInputBuffer(inputBuffer);
outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());
response.setOutputBuffer(outputBuffer);
// Create and add the identity filters.
inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new IdentityOutputFilter());
// Create and add the chunked filters.
inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),
protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),
protocol.getMaxSwallowSize()));
outputBuffer.addFilter(new ChunkedOutputFilter());
// Create and add the void filters.
inputBuffer.addFilter(new VoidInputFilter());
outputBuffer.addFilter(new VoidOutputFilter());
// Create and add buffered input filter
inputBuffer.addFilter(new BufferedInputFilter());
// Create and add the chunked filters.
//inputBuffer.addFilter(new GzipInputFilter());
outputBuffer.addFilter(new GzipOutputFilter());
pluggableFilterIndex = inputBuffer.getFilters().length;
}
public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive &&
!isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE &&
!protocol.isPaused()) {
// Parsing the request header
try {
if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
}
else if (handleIncompleteRequestLineRead()) {
break;
}
}
if (protocol.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!protocol.getDisableUploadTimeout()) {
socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
}
}
}
catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
break;
}
catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString("http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString("http11processor.fallToDebug");
case INFO:
log.info(message, t);
break;
case DEBUG:
log.debug(message, t);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
// Has an upgrade been requested?
Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");
boolean foundUpgrade = false;
while (connectionValues.hasMoreElements() && !foundUpgrade) {
foundUpgrade = connectionValues.nextElement().toLowerCase(
Locale.ENGLISH).contains("upgrade");
}
if (foundUpgrade) {
// Check the protocol
String requestedProtocol = request.getHeader("Upgrade");
UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);
if (upgradeProtocol != null) {
if (upgradeProtocol.accept(request)) {
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
response.setHeader("Connection", "Upgrade");
response.setHeader("Upgrade", requestedProtocol);
action(ActionCode.CLOSE, null);
getAdapter().log(request, response, 0);
InternalHttpUpgradeHandler upgradeHandler =
upgradeProtocol.getInternalUpgradeHandler(
socketWrapper, getAdapter(), cloneRequest(request));
UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);
action(ActionCode.UPGRADE, upgradeToken);
return SocketState.UPGRADING;
} }
}
if (getErrorState().isIoAllowed()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
}
catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}
int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
}
else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// Process the request in the adapter
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() &&
!isAsync() && statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null); }
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
}
catch (HeadersTooLargeException e) {
log.error(sm.getString("http11processor.request.process"), e);
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close");
// TODO: Remove
}
}
catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
} }
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync()) {
// If this is an async request then the request ends when it has
// been completed. The AsyncContext is responsible for calling
// endRequest() in that case.
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
} }
if (!protocol.getDisableUploadTimeout()) {
int connectionTimeout = protocol.getConnectionTimeout();
if(connectionTimeout > 0) {
socketWrapper.setReadTimeout(connectionTimeout);
} else {
socketWrapper.setReadTimeout(0);
} }
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
sendfileState = processSendfile(socketWrapper);
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || protocol.isPaused()) {
return SocketState.CLOSED;
}
else if (isAsync()) {
return SocketState.LONG;
}
else if (isUpgrade()) {
return SocketState.UPGRADING;
}
else {
if (sendfileState == SendfileState.PENDING) {
return SocketState.SENDFILE;
}
else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
}
else {
return SocketState.LONG;
}
} else
{
return SocketState.CLOSED;
}
}
}
}
Copy the code
-
Http11Processor is AbstractProcessor implementation subclass, TomcatRequest/TomcatResponse/Http11InputBuffer/Http11OutBuffer object, set up their relationship, And associated CoyoteAdapter objects.
-
The Http11Processor core method is the service method, the overall code is relatively long, we focus on the focus of analysis.
-
The Service method initializes Http11InputBuffer and Http11OutputBuffer object instances that parse request lines, request headers, write data, and so on.
-
Request lines are first parseRequestLine() of the Http11InputBuffer object. If the request lines are not completely parsed (for example, the client is not sending data), the socketstate.long status is returned. ConnectionHanlder deregisters the OP_READ event from the socket wrapper object and adds it to the Poller thread’s event queue, allowing the Poller thread to continue listening for client-readable events. Waiting for the client to continue sending data. The connection between the original socket and the processing class Http11Processor is not removed. The Http11Processor instance is not reclaimed so that the current state (parsed data) can be processed when the client sends data again.
-
Reuse Http11InputBuffer. ParseHeaders () method of the analytic request header, if there is no parsing the request header (client did not send the data), handling and step on line analytical requests.
-
There will be some handling of the upgrade protocol (such as webSocket) in this method, which we won’t go into detail here.
-
When the request headers and request rows are fully parsed, the coyoteAdapter.service () method is called, which calls the standard Servlet API through servlet Container.
-
When the Servlet API calls normally, go back and call the endRequest() method for non-asynchronous requests. In its internal Http11InputBuffer. EndRequest end () at the request of using Http11OutputBuffer. The end () to the rest of the response data sent to the client side.
-
For the asynchronous mode servlet request at the same time, also to call Http11InputBuffer. NextRequest () method and Http11OutputBuffer nextRequest () method to recycle two instances, so that subsequent reuse, It can improve efficiency.
-
OPEN ConnectionHandler object.OPEN indicates that the connection is a long connection and the original socket is not closed. Therefore, remove the mapping between the socket and the Http11Processor from the associated Map to release the current Http11Processor instance for future reuse. Since it is a long connection, OP_READ events are registered on the socket wrapper object and added to the Poller thread event queue as asynchronous processing. The Poller thread continues to listen for client-readable events to terminate the current request in preparation for the next request.
The core code logic of CoyoteAdapter**** is as follows:
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); if (request == null) { request = connector.createRequest(); request.setCoyoteRequest(req); response = connector.createResponse(); response.setCoyoteResponse(res); request.setResponse(response); response.setRequest(request); req.setNote(ADAPTER_NOTES, request); res.setNote(ADAPTER_NOTES, response); req.getParameters().setQueryStringCharset(connector.getURICharset()); } if (connector.getXpoweredBy()) { response.addHeader("X-Powered-By", POWERED_BY); } boolean async = false; boolean postParseSuccess = false; req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get()); try { // Parse and set Catalina and configuration specific // request parameters postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } if (request.isAsync()) { async = true; ReadListener readListener = req.getReadListener(); if (readListener ! = null && request.isFinished()) { // Possible the all data may have been read during service() // method so this needs to be checked here ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); if (req.sendAllDataReadEvent()) { req.getReadListener().onAllDataRead(); } } finally { request.getContext().unbind(false, oldCL); } } Throwable throwable = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION); // If an async request was started, is not going to end once // this container thread finishes and an error occurred, trigger // the async error process if (! request.isAsyncCompleting() && throwable ! = null) { request.getAsyncContextInternal().setErrorState(throwable, true); } } else { request.finishRequest(); response.finishResponse(); } } catch (IOException e) { // Ignore } finally { AtomicBoolean error = new AtomicBoolean(false); res.action(ActionCode.IS_ERROR, error); if (request.isAsyncCompleting() && error.get()) { // Connection will be forcibly closed which will prevent // completion happening at the usual point. Need to trigger // call to onComplete() here. res.action(ActionCode.ASYNC_POST_PROCESS, null); async = false; // Access log if (! async && postParseSuccess) { // Log only if processing was invoked. // If postParseRequest() failed, it has already logged it. Context context = request.getContext(); Host host = request.getHost(); // If the context is null, it is likely that the endpoint was // shutdown, this connection closed and the request recycled in // a different thread. That thread will have updated the access // log so it is OK not to update the access log here in that // case. / / The other possibility is that an error occurred early in // processing and the request could not be mapped to a Context. // Log via the host or engine in that case. long time = System.currentTimeMillis() - req.getStartTime(); if (context ! = null) { context.logAccess(request, response, time, false); } else if (response.isError()) { if (host ! = null) { host.logAccess(request, response, time, false); } else { connector.getService().getContainer().logAccess( request, response, time, false); } } } req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response i f (! async) { updateWrapperErrorCount(request, response); r equest.recycle(); response.recycle(); }}}Copy the code
-
CoyoteAdapter core method is the service method, the overall code is relatively long, we focus on the analysis of key points.
-
This method will use Tomcat request and Response to create the standard request and Response of the servlet and set their association relationship, that is, to associate Tomcat Request with servlet Request, Associate Tomcat Response with servlet Response.
-
Invoking the standard servlet API through servlet Container, connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)
-
Asynchronous request, if not through it. After completion of the servlet API finishRequest () method call and HttpServletResponse finishResponse () method call to end the current request and response.
-
Finally through it. The recycle () call and HttpServletResponse recycle () call to recycle the request and response, so the efficiency can be reused.