Long polling

Long polling is optimized on top of polling. Polling is when an action is performed at a fixed frequency. So what’s the downside of polling? Why does MQ implement pull with long polling instead of polling? Let’s start with polling: if polling is implemented. The Consumer needs to pull the message at a very short frequency to be able to pull the message in real time. This leads to several problems,

  1. If the frequency is very short, yesRocketMQCause stress.
  2. When there are no messages to pull, a large number of invalid requests are initiated

So how does long polling address the disadvantages of polling?

  1. When there are no messages to pull, the long poll suspends the request and then callsbrokerTake the initiative topushA message toconsumer. This avoids a large number of invalid requests.
  2. Long polling does not request at a fixed frequency, but does not continue requests until a response is received.

Take a look at the pseudocode below

polling


ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

executor.scheduleAtFixedRate(() -> {

    System.out.println("Polling");

}, 0.1 , TimeUnit.SECONDS);

Copy the code

Long polling

Timer timer = new Timer();
timer.schedule(new MyTask(timer),1000);

public static class MyTask extends TimerTask {

    private Timer timer;

    private static final AtomicInteger index = new AtomicInteger();

    public MyTask(Timer timer) {
        this.timer = timer;
    }

    @Override

    public void run(a) {
        if (index.getAndIncrement() % 10= =0) {
            System.out.println("Simulate server blocking request, delay dispatch by 10s.");
            timer.schedule(new MyTask(timer), 10000);
        } else {
            System.out.println("Long polling");
            timer.schedule(new MyTask(timer), 1000); }}}Copy the code

RocketMQ consumer end long polling implementation

The starting point of the consumer long polling is the run method of the PullMessageService. The message is pulled only if it is taken () from the pullRequestQueue to the element. While Rebalance, a PullRequest is put into the queue.

public class PullMessageService extends ServiceThread {

    @Override
    public void run(a) {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                // Pull the message
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end"); }}Copy the code

In the PullMessageService pullMessage() method, the pullMessage is triggered again in the pullMessage callback

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if(pullResult ! =null) {
            // After the todo is pulled off, it is filtered
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    log.error("Successfully pulled message");
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                        pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }}if (pullResult.getNextBeginOffset() < prevRequestOffset
                        || firstMsgOffset < prevRequestOffset) {
                        log.warn(
                            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                            pullResult.getNextBeginOffset(),
                            firstMsgOffset,
                            prevRequestOffset);
                    }

                    break;
                case NO_NEW_MSG:
                    System.out.println(System.currentTimeMillis() + "No new information.");
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case NO_MATCHED_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case OFFSET_ILLEGAL:
                    log.warn("the pull request offset illegal, {} {}",
                        pullRequest.toString(), pullResult.toString());
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    pullRequest.getProcessQueue().setDropped(true);
                    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                        @Override
                        public void run(a) {
                            try {
                                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                    pullRequest.getNextOffset(), false);

                                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, {}", pullRequest);
                            } catch (Throwable e) {
                                log.error("executeTaskLater Exception", e); }}},10000);
                    break;
                default:
                    break; }}}@Override
    public void onException(Throwable e) {
        if(! pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }};Copy the code

We see that RocketMQ’s long polling implementation does, indeed, trigger the pull message in the callback function. But there’s one more problem. We see that when there is no message, the Consumer will immediately pull the message. So how does RocketMQ manage to avoid client requests when there is no message.

How does the Broker handle message pulling

PullMessageProcessor#processRequest

Below is a code snippet that failed to pull the message

public class PullMessageProcessor implements NettyRequestProcessor {

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)

throws RemotingCommandException {
    case ResponseCode.PULL_NOT_FOUND:
    BrokerAllowSuspend is true for the first time a message is not pulled
    if (brokerAllowSuspend && hasSuspendFlag) {
        // Todo consumer pass the parameter, default 15s
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);

        // todo hold request
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        // Todo key code
        response = null;
        break; }}Copy the code

Look at this. This method first brokerController. GetPullRequestHoldService (). SuspendPullRequest (topic, queueId, pullRequest); . As you can see from the method name, pause the pull request.

All this method does is put the current request in a queue.

public class PullRequestHoldService extends ServiceThread {
    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
        String key = this.buildKey(topic, queueId);
        ManyPullRequest mpr = this.pullRequestTable.get(key);
        if (null == mpr) {
            mpr = new ManyPullRequest();
            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
            if(prev ! =null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }}Copy the code

So somebody puts it in, somebody has to consume the data in the queue.

Consume the queue data method, PullRequestHoldService public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map

properties) {
,>

Let’s look at the implementation of this method

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if(mpr ! =null) {
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if(requestList ! =null) {
            List<PullRequest> replayList = new ArrayList<PullRequest>();

            // Iterate over all held requests
            for (PullRequest request : requestList) {
                // The maximum offset of the queue passed from the upper layer
                long newestOffset = maxOffset;
                if (newestOffset <= request.getPullFromThisOffset()) {
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                }

                // If there is a new message
                if (newestOffset > request.getPullFromThisOffset()) {
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    // match by bit map, need eval again when properties is not null.
                    if(match && properties ! =null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }

                    if (match) {
                        try {
                            // There is a new message, actively send the message to Consumer
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue; }}// If there is no new message and the maximum pause time exceeds 15s, actively push the message to the Consumer
                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }

                replayList.add(request);
            }

            if(! replayList.isEmpty()) { mpr.addPullRequest(replayList); }}}}Copy the code

Next, take a look at the executeRequestWhenWakeup method.

public void executeRequestWhenWakeup(final Channel channel,
    final RemotingCommand request) throws RemotingCommandException {
    Runnable run = new Runnable() {
        @Override
        public void run(a) {
            try {
                // Critical code, false is critical
                final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);

                if(response ! =null) {
                    response.setOpaque(request.getOpaque());
                    response.markResponseType();
                    try {
                        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if(! future.isSuccess()) { log.error("processRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); log.error(request.toString()); log.error(response.toString()); }}}); }catch (Throwable e) {
                        log.error("processRequestWrapper process request over, but response failed", e); log.error(request.toString()); log.error(response.toString()); }}}catch (RemotingCommandException e1) {
                log.error("excuteRequestWhenWakeup run", e1); }}};this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
Copy the code

There’s a key piece of code in there. PullMessageProcessor.this.processRequest(channel, request, false); . Where false means that broker pauses are not allowed.

Let’s revisit what happens when the broker fails to retrieve messages.


case ResponseCode.PULL_NOT_FOUND:

    // When brokerAllowSuspend = false, not handled
    if (brokerAllowSuspend && hasSuspendFlag) {
    // Todo consumer pass the parameter, default 15s
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
    }
    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
    // todo hold request
    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
    // This code is critical
    response = null;
break;
}
Copy the code

We can observe that brokerAllowSuspend = false, that is, response is not set to null.

For those of you who are paying attention, I mentioned earlier that response = null is critical.

What does the broker do when response = null

NettyRemotingAbstract#processRequestCommand()

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

if(! cmd.isOnewayRPC()) {// todo does not equal null to flush
    if(response ! =null) {
        response.setOpaque(opaque);
        response.markResponseType();
        try {
            ctx.writeAndFlush(response);
        } catch (Throwable e) {
            log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); }}else{}}Copy the code

Obviously, the client will only respond if the response is not null. When a message is not pulled the first time, null is returned, so the client does not respond and the client does not trigger the message pull.

The Broker then queues the request. Check the queue for requests every 5s. If there is a message, actively push the message to the client. If no message is sent, the system checks whether the maximum pause time exceeds 15 seconds. If the pause time exceeds 15 seconds, the system sends a message to the client.

RocketMQ actually actively pushes messages to the client as they are written to the Commitlog.

ReputMessageService handles this job. Forward commitlogs every 1ms and actively push messages to consumers

class ReputMessageService extends ServiceThread {
    private void doReput(a) {
        
        if(BrokerRole.SLAVE ! = DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
           // Start tweeting
           DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); }}}Copy the code

conclusion

Long poll summary

Long polling: avoid sending invalid requests; You can control the speed at which a message is pulled, because pulling again is handled in a callback. Polling: sends invalid requests and consumes server resources. Cannot control the speed at which messages are pulled.

RocketMQ long polling implementation summary

  1. whenConsumerThe first time I failed to pull a message,RocketMQWill hold the request15s.
  2. RocketMQThe way to implement hold is to holdresponseSet it to null so that the client does not respond.
  3. PullRequestHoldServiceThe thread checks for held requests every 5 seconds and pushes new messages to themConsumer
  4. ReputMessageServiceThread every1msWill be forwardedcommitlogAt the same time, it will also check the held request, and actively push messages toConsumer