As mentioned in the previous article, the NettyServerCnxnFactory is used to handle client connections and requests.
new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
// parent channel options
.option(ChannelOption.SO_REUSEADDR, true)
// child channels options
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); .// Important, processor
pipeline.addLast("servercnxnfactory", channelHandler); }});Copy the code
The channelHandler handler is used for client requests.
Cnxnchannelhandler. Java, see channelActive(), channelRead() methods.
@Sharable
class CnxnChannelHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel active {}", ctx.channel());
}
final Channel channel = ctx.channel();
if (limitTotalNumberOfCnxns()) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
channel.close();
return;
}
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
channel.close();
return;
}
// Create a new NettyServerCnxn, equivalent to a Session, and set it to CONNECTION_ATTRIBUTE
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); . }@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {... allChannels.remove(ctx.channel()); NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); . }@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {... }@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("message received called {}", msg);
}
try {
LOG.debug("New message {} from {}", msg, ctx.channel());
// Get the NettyServerCnxn set through channelActive
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
// Process network packetscnxn.processMessage((ByteBuf) msg); }}catch (Exception ex) {
LOG.error("Unexpected exception in receive", ex);
throwex; }}finally{ ReferenceCountUtil.release(msg); }}// Write back to the client
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
promise.addListener(onWriteCompletedTracer);
}
super.write(ctx, msg, promise); }}Copy the code
In the channelHandler handler, through channelActive(), create a NettyServerCnxn, which is equivalent to a Session on the server side, for the receive and send protocols. Call cnxn.processMessage((ByteBuf) MSG) via channelRead(); Processing protocol.
NettyServerCnxn.java
NettyServerCnxn#processMessage(ByteBuf buf)
void processMessage(ByteBuf buf) {...if (throttled.get()) {
...
} else {
LOG.debug("not throttled");
if(queuedBuffer ! =null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
// Receive the message
receiveMessage(buf);
// Have to check ! closingChannel, because an error in
// receiveMessage() could have led to close() being called.. }}}Copy the code
NettyServerCnxn#receiveMessage(buf);
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while(message.isReadable() && ! throttled.get()) {if(bb ! =null) {...if (bb.remaining() > message.readableBytes()) {
intnewLimit = bb.position() + message.readableBytes(); bb.limit(newLimit); } message.readBytes(bb); bb.limit(bb.capacity()); .if (bb.remaining() == 0) {
bb.flip();
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null| |! zks.isRunning()) {throw new IOException("ZK down");
}
if (initialized) {
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
// Process the message
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null; }}else{... }}}catch(IOException e) { ... }}Copy the code
In this step, Netty’s ByteBuf is converted into NIO’s ByteBuffer, which is handed to ZookeeperServer’s processPacket(this, bb). Process messages.
ZookeeperServer.java
ZookeeperServer#processPacket()
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
// This step is mainly deserialization
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
...
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(incomingBuffer, cnxn, h);
} else {
if(! authHelper.enforceAuthentication(cnxn, h.getXid())) {// Authentication enforcement is failed
// Already sent response to user about failure and closed the session, lets return
return;
} else {
// This is usually the step to process the message
// Encapsulate it as a Request object
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejectedcheckRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); submitRequest(si); }}}Copy the code
This step deserializes the ByteBuffer and encapsulates it into a Request object, which is then submitted for further processing.
ZookeeperServer#submitRequest()
public void submitRequest(Request si) {
enqueueRequest(si);
}
Copy the code
ZookeeperServer#enqueueRequest()
public void enqueueRequest(Request si) {
if (requestThrottler == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000); }}catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (requestThrottler == null) {
throw new RuntimeException("Not started"); }}}/ / submit
requestThrottler.submitRequest(si);
}
Copy the code
RequestThrottler#submitRequest()
public void submitRequest(Request request) {
if (stopping) {
LOG.debug("Shutdown in progress. Request cannot be processed");
dropRequest(request);
} else{ request.requestThrottleQueueTime = Time.currentElapsedTime(); submittedRequests.add(request); }}Copy the code
This step involves submitting requests to the submittedRequests queue and waiting for Run () in the RequestThrottler to execute them.
RequestThrottler#run()
@Override
public void run(a) {
try {
while (true) {
if (killed) {
break; } Request request = submittedRequests.take(); .// Throttling is disabled when maxRequests = 0.// A dropped stale request will be null
if(request ! =null) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
if (shouldThrottleOp(request, elapsedTime)) {
request.setIsThrottled(true);
ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
}
// Executezks.submitRequestNow(request); }}}catch (InterruptedException e) {
LOG.error("Unexpected interruption", e);
}
int dropped = drainQueue();
LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
}
Copy the code
This step takes () requests from the submittedRequests queue and sends them to the submitRequestNow(Request) in ZookeeperServer. Method is executed immediately.
ZookeeperServer#submitRequestNow()
public void submitRequestNow(Request si) {...try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
// Start processing from the first processor
firstProcessor.processRequest(si);
if(si.cnxn ! =null) { incInProcess(); }}else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
newUnimplementedRequestProcessor().processRequest(si); }}catch (MissingSessionException e) {
LOG.debug("Dropping request.", e);
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request", e);
// Update request accounting/throttling limitsrequestFinished(si); }}Copy the code
This step is to process the request directly from the first handler, also mentioned in the previous article
PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor processing chain.
So to summarize, we’re basically talking about a network package, deserialized from byte stream, and then packaged into a Request object,
Finally, it goes to the PrepRequestProcessor processor for processing.
The next article continues to analyze the specific processing process.