Troubleshoot problems

The first is the error code:

private void cleanExpiredRequest(a) {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break; }}catch (Throwable ignored) {
            }
        }

        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    }
Copy the code

[PCBUSY_CLEAN_QUEUE]broker busy’ is an error log output by the client.

public boolean isOSPageCacheBusy(a) {
        long begin = this.getCommitLog().getBeginTimeInLock();
        long diff = this.systemClock.now() - begin;

        return diff < 10000000
                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }
Copy the code

This method is located in defaultmessagestore.java, the broker’s DefaultMessageStore class, which is analyzed in RocketMQ’s CommitLog message store mechanism in the source code analysis. We will focus on the logic of the above method.

Begin is set by the commitLog putMessage() method:

// The default operating system page cache timeout period
private long osPageCacheBusyTimeOutMills = 1000; .// The start timestamp of a message when the commitLog stores it
private volatile long beginTimeInLock = 0; .// Code for setting beginTimeInLock when putMessage
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
Copy the code

This is the timestamp from which the previous entry was stored. Note that this is volatile. The default operating system page cache busy timeout in messageStoreConfig is 1s. That is, when the timing thread in the Broker’s BrokerFastFailure class compares the current time to the start of the last message store every 10 seconds, if it is greater than 1s (the default, Configurable) and execute the fast-failure mechanism when less than 10000s. Of course, the commitLog resets begin to 0 after storing a message, so that diff is greater than 10000s if the broker is idle.

RocketMQ message processing mechanism

Register message handlers

Back to the cleanExpiredRequest() method, we see that there is a! Enclosing brokerController. GetSendThreadPoolQueue (). IsEmpty (), The assumption is that the broker will wrap each received message into a Runnable task and stuff it into the sendThreadPoolQueue for processing. Let’s look at the implementation:

The queue is initialized in the brokerController constructor with a default capacity of 10000

this.sendThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
Copy the code

BrokerController. The initialize () will initialize sendMessageExecutor thread pool:

this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
Copy the code

And where does sendMessageExecutor come in?

public void registerProcessor(a) {
        /** * SendMessageProcessor */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); . Omit other code}Copy the code

. You can see in brokerController registerProcessor () creates a SendMessageProcessor message processing object, And call the remotingServer. RegisterProcessor () to register according to different types. Enter the NettyRemotingServer:

public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }

        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
        this.processorTable.put(requestCode, pair);
    }
Copy the code

The logic is simple: construct a Pair object from the Processer and Executor and put it in the processorTable (HashMap) with the requestCode (message type) as the key.

Message processing

RocketMQ is based on Netty, and the gateway for message processing is definitely NettyRemotingServer.

public void start(a) {...// Initialize handlers
        prepareSharableHandlers();

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0.0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                / / add serverHandlerserverHandler ); }}); . }Copy the code

PrepareSharableHandlers () initializes the serverHandler object, which, as the name says, is used to process the received message:

@ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            // Process the received messageprocessMessageReceived(ctx, msg); }}public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if(cmd ! =null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break; }}}Copy the code

We focus on the processRequestCommand(CTX, CMD) method, simplified as follows:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // Get the Pair based on the message type from the message registry
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if(pair ! =null) {
            // Message processing entry
            Runnable run = new Runnable() {
                @Override
                public void run(a) {
                    try {
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        // Here is the entry for processing messages
                        finalRemotingCommand response = pair.getObject1().processRequest(ctx, cmd); . }catch(Throwable e) { ... }}};try {
                // Encapsulate as a RequestTask
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                // Submit to executor, sendMessageExecutor
                pair.getObject2().submit(requestTask);
            } catch(RejectedExecutionException e) { ... }}else{... }}Copy the code

At this point, we have a clear understanding of the processing logic of the broker busy situation. To sum up: BrokerFastFailure a new timed task thread pool is created when the broker is started, and every 10s the time between the current time and the start of the last message is checked. If the time is greater than the configured value (1s by default) then fast-Failure is triggered and outstanding tasks are fetched from the queue of the sending message processing thread pool. An error message is returned to the client.