init
AbstractProtocol#init
Public void init() throws Exception {//... // AbstractProtocol#init calls endpoint.init(); // In non-blocking IO mode the endpoint is NioEndpoint. Init (); }Copy the code
AbstractEndpoint#init
Public final void init() throws Exception {if (bindOnInit) {// Call the binding method bindWithCleanup(); bindState = BindState.BOUND_ON_INIT; } //... }Copy the code
AbstractEndpoint#bindWithCleanup
Private void bindWithCleanup() throws Exception {try {// Call subclass bind(); } catch (Throwable t) {//... }}Copy the code
NioEndpoint#bind
Public void bind() throws Exception {//socket bind initServerSocket(); If (acceptorThreadCount == 0) {acceptorThreadCount = 1; If (pollerThreadCount <= 0) {pollerThreadCount = 1; } //... }Copy the code
NioEndpoint#initServerSocket
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
//服务端绑定端口号的一般写法
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} //......
//设置 servicesock 为阻塞io
serverSock.configureBlocking(true); //mimic APR behavior
}
Copy the code
This is how Tomcat initializes a bond port
start
AbstractProtocol#start
public void start() throws Exception { //.... // Call the endpoint#start method endpoint.start(); / /... }Copy the code
AbstractEndpoint#start
Public final void start() throws Exception {if (bindState == bindstate.unbound) {bindWithCleanup(); bindState = BindState.BOUND_ON_START; } // Call the subclass startInternal(); }Copy the code
NioEndpoint#startInternal
public void startInternal() throws Exception { if (! running) { //..... If (getExecutor() == null) {createExecutor(); Pollers = new poller [getPollerThreadCount()]; pollers = new poller [getPollerThreadCount()]; pollers = new poller [getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); / / start pollerThread. Start (); } // Create acceptor startAcceptorThreads(); }}Copy the code
AbstractEndpoint#startAcceptorThreads
Protected final void startAcceptorThreads() {// Default 1 int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; I++) {// create acceptor acceptor <U> acceptor = new acceptor <>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }}Copy the code
AbstractEndpoint#createExecutor
public void createExecutor() { internalExecutor = true; TaskQueue TaskQueue = new TaskQueue(); // The TaskThreadFactory thread pool is not Java native, but Tomcat inherits Java native thread pool, TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority())); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }Copy the code
Acceptor threads (1); Poller threads (2)
Acceptor
Public class Acceptor<U> implements Runnable {// implements Runnable {Copy the code
Acceptor#run
public void run() { while (endpoint.isRunning()) { //..... // The socket is connected to the socket. // The socket is connected to the socket. // The socket is connected to the socket. Then only write socket. = the endpoint serverSocketAccept (); } catch (Exception ioe) { //.... If (endpoint.isrunning () &&! Endpoint.ispaused ()) {// Process the request if (! endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); }Copy the code
NioEndpoint#serverSocketAccept
Protected SocketChannel serverSocketAccept() throws Exception {// Call Java accept return serversocket.accept (); }Copy the code
NioEndpoint#setSocketOptions
Protected Boolean setSocketOptions(SocketChannel socket) {// Process the connection try {// Set the socket to non-blocking, Unlike init, serviceSocket is blocking //socket is non-blocking socket.configureblocking (false); Socket sock = socket.socket(); socketProperties.setProperties(sock); Synchronizedstack is an object cache at the bottom of the array, so you don't need to create objects frequently. NioChannel Channel = nioChannels.pop(); nioChannels.pop(); If (channel == null) {// Write buffer SocketBufferHandler bufHandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else {channel channel = new NioChannel(socket, bufHandler); } } else { channel.setIOChannel(socket); channel.reset(); } // Register poller channel getPoller0().register(channel); } catch (Throwable t) { //.... } return true; }Copy the code
An Acceptor accepts an Acceptor and runs an Acceptor. When an Acceptor receives a request, it wraps the request channel and places it on a Poller. Acceptors are blocked
Poller
Public class poller implements Runnable {// Poller implements Runnable {Copy the code
Acceptor#run calls the Poller#register method nioend.poller #register
Public void register(final NioChannel socket) {// Wrap the socket. SetPoller (this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); //eventCache is also an object cache. PollerEvent r = eventCache.pop(); InterestOps (selectionkey.op_read); // selectionkey. OP_READ read event //this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // Put PollerEvent in an array addEvent(r); }Copy the code
NioEndpoint.Poller#addEvent
Private void addEvent(PollerEvent event) {// Put the array events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }Copy the code
1. Wrap the socket and place the PollerEvent in the array. Next, look at the run method nioendpoint.poller# run
Public void run() {// Run while (true) {Boolean hasEvents = false; try { if (! Close) {// call events(hasEvents = events(); If (wakeupcounter.getandSet (-1) > 0) {keyCount = selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } / /... Iterator<SelectionKey> Iterator = keyCount > 0? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator ! = null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); ProcessKey (sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }Copy the code
NioEndpoint.Poller#events
public boolean events() { boolean result = false; Poll () PollerEvent PE = null; PollerEvent PE = null; PollerEvent PE = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) ! = null; i++ ) { result = true; PE. Run (); PE. Run (); PE. pe.reset(); if (running && ! paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }Copy the code
NioEndpoint.PollerEvent#run
Public void run() {if (interestOps == OP_REGISTER) {try {// The channel event registered with the selector selector is read socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } } else //....... }}Copy the code
So what envent does is it takes the PollerEvent from the array and registers the Read event. So once you’ve registered, if there’s a read or write operation, then select #select will have data, so you can read or write
NioEndpoint.Poller#processKey
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//读
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
//写
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
}
}
Copy the code
AbstractEndpoint#processSocket
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; SocketProcessorBase<S> sc = processorCache.pop(); SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); Executor Executor = getExecutor(); if (dispatch && executor ! Execute (sc); // Execute the task executor.execute(sc) through the worker thread; } else { sc.run(); } } return true; }Copy the code
That’s where poller comes in. Accept a channle from an accepter. Register a channle read event with a selector. Call a worker thread to read or write a channel