sequence

In this paper, we study the rocketmq suspendCurrentQueueTimeMillis

suspendCurrentQueueTimeMillis

Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

	//......

    /**
     * Suspending pulling time for cases requiring slow pulling like flow-control scenario.
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    public long getSuspendCurrentQueueTimeMillis() {
        return suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } / /... }Copy the code
  • DefaultMQPushConsumer defines suspendCurrentQueueTimeMillis attribute, the default value is 1000

submitConsumeRequestLater

Rocketmq – the client – 4.5.2 – sources jar! /org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

public class ConsumeMessageOrderlyService implements ConsumeMessageService {

	//......

    private void submitConsumeRequestLater(
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final long suspendTimeMillis
    ) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis == -1) {
            timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
        }

        if (timeMillis < 10) {
            timeMillis = 10;
        } else if (timeMillis > 30000) {
            timeMillis = 30000;
        }

        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true); } }, timeMillis, TimeUnit.MILLISECONDS); } / /... }Copy the code
  • SubmitConsumeRequestLater method in timeMillis to 1 when read defaultMQPushConsumer. GetSuspendCurrentQueueTimeMillis () value, if the value is less than 10 is reset to 10, If the value is greater than 30000, reset the value to 30000. Then use scheduledExecutorService to delay timeMillis to execute the submitConsumeRequest method

summary

DefaultMQPushConsumer defines suspendCurrentQueueTimeMillis attribute, the default value is 1000; ConsumeMessageOrderlyService submitConsumeRequestLater method in timeMillis to 1 when read defaultMQPushConsumer. GetSuspendCurrentQueueTi The value of meMillis(), reset to 10 if less than 10 and 30000 if greater than 30000; Then use scheduledExecutorService to delay timeMillis to execute the submitConsumeRequest method

doc

  • DefaultMQPushConsumer