The resources

  • Rocketmq website: Rocketmq.apache.org/

  • Other Rocketmq projects: github.com/apache/rock…

  • Rocketmq-console installation: blog.csdn.net/zzzgd_666/a…

RocketMQ Parameter Guide

NameServer Configures the property

BrokerClusterName = RocketMQCluster brokerName= Broker - A #0 = Master >0 = Slave brokerId=0 #nameServer address, semicolon split namesrvAddr= RocketMQ-Nameserver1:9876; Rocketmq - nameserver2:9876 # this configuration can solve the double card, sending a message go outside the network problems, it is ok to match on Intranet IP brokerIP1 = 10.30.51.149 # when sending messages, automatically create the server does not exist the topic, DefaultTopicQueueNums =8 # Whether to allow the Broker to automatically create topics. Advice line open, shut online autoCreateSubscriptionGroup = true # Broker foreign service listening on port listenPort = 10911 # delete files point in time, the default deleteWhen = 0 am 03 # document retention time, MapedFileSizeCommitLog =1073741824 #ConsumeQueue by default, 30W files are stored in each file. Adjust according to the business situation mapedFileSizeConsumeQueue = 1000000 redeleteHangedFileInterval destroyMapedFileIntervalForcibly = 120000 = 120000 DiskMaxUsedSpaceRatio =88 # storePathRootDir=/app/data/ RocketMQ /data #commitLog storage path StorePathCommitLog = / app/data/rocketmq queue storage/data/commitlog # consumption path storage paths StorePathConsumeQueue = / app/data/rocketmq/data/consumerqueue # message index storage paths storePathIndex = / app/data/rocketmq/data/index # checkpoint file storage path storeCheckpoint = / app/data/rocketmq/data/checkpoint # abort file storage path AbortFile = / app/data/rocketmq/data / # abort message size limits Changed to 16 m maxMessageSize = ‭ ‬ # 16777216 send queue waiting time waitTimeMillsInSendQueue=3000 osPageCacheBusyTimeOutMills=5000 flushCommitLogLeastPages=12 flushConsumeQueueLeastPages=6 FlushCommitLogThoroughInterval = 30000 flushConsumeQueueThoroughInterval = 180000 # # Broker role - ASYNC_MASTER asynchronous replication Master # - SYNC_MASTER Synchronous Double Write Master # -Slave brokerRole=ASYNC_MASTER # Flush Mode #- ASYNC_FLUSH Asynchronous flush # -synC_Flush Synchronous flush # # flushDiskType = ASYNC_FLUSH checkTransactionMessageEnable = false hair message thread pool quantity sendMessageThreadPoolNums = 80 # to pull the thread pool pullMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=trueCopy the code

Rocketmq send control flow

For the first four broker busy types, it is mainly because the broker holds the lock for longer than the set 1s when appending messages. The broker will throw an error for self-protection, and the client will choose another broker server to retry.

For non-financial services, it is recommended that transientStorePoolEnable = true be used to avoid the previous four brokers. This is because messages are stored in off-heap memory and RocketMQ provides memory locking. Its add-on performance can be guaranteed to a certain extent, so that we can achieve read and write separation in the memory usage level, that is, write messages are written directly into the memory outside the heap, consume messages are read directly from pagecache, and then regularly write messages from the memory outside the heap into pagecache.

However, this solution brings with it the possibility of message loss. If you are very careful about the message, it is recommended to expand the cluster or migrate the topic to a new cluster.

The maximum delay for messages to be appended to memory is not more than 1s. Appending is usually very fast. Most appending times are less than 1ms, but it may take more than 200ms for the message to be appended. As a result, the waiting time of tasks in the queue exceeds 200ms. In this case, the broker fails quickly, which facilitates the client to retry quickly. However, the request is not checked in real time, but every 10 seconds.

It is worth noting that once TIMEOUT_CLEAN_QUEUE appears, there may be multiple such error messages at one point, depending on how many queues are currently backlogged.

Abnormal Rocketmq sending. Procedure

System BUSY and Broker Busy solutions

  • [REJECTREQUEST]system busy too many requests and system thread pool busy
  • [PC_SYNCHRONIZED]broker busy
  • [PCBUSY_CLEAN_QUEUE]broker busy
  • [TIMEOUT_CLEAN_QUEUE]broker busy

The solutions written before are tested based on the test environment. After going to the production environment, there was no problem in normal use, but during the pressure test in the production environment, the System Busy was abnormal (it almost crashed).

com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
	at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030)
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989)
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90)
	at 
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Copy the code

Positioning error

  • CleanExpiredRequestInQueue handles to send messages, pull, heart rate, transaction data in a message queue, the problem is send Topic izvestia out mistakes, so in view of the messaging process is analyzed.

  • The source location for this error is brokerFastFail. Java class, which starts a scheduled task every 10 milliseconds when the broker starts, with the following code:

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                if(! blockingQueue.isEmpty()) {// Get the queue header element
                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }

                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    // If the processing time of the task corresponding to the header element exceeds the set maximum wait time, the processing request returns this error and the task is removed
                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); }}else {
                        break; }}else {
                    break; }}catch (Throwable ignored) {
            }
        }
    }
Copy the code

This code is the core of the Broker’s fast failure mechanism. If a queue’s header element (the first element to be processed or being processed) waits longer than the maximum waiting time set for that queue, the task for that element object is discarded. And returns the [TIMEOUT_CLEAN_QUEUE] Broker BUSY exception message for this request.

Send Topic reports this error

SendThreadPoolQueue takes out the header element, converts it into the corresponding task, and determines whether the task’s queue survival time exceeds the maximum waiting time set by the queue. If so, the assembly process returns the object response. The response code for RemotingSysResponseCode SYSTEM_BUSY, content as follows:

[TIMEOUT_CLEAN_QUEUE] Broker busy, start flow control for a while, period in queue: [duration of current task in queue], size of queue: [Length of current queue]Copy the code

MQClientAPIImpl processSendResponse processing returns the response, according to the response. The getCode () processing branch, eventually return MQBrokerException abnormalities, the response branch handling code is as follows:

// Return the result only in the case of responsecode.success, otherwise raise MQBrokerException
private SendResult processSendResponse(
        final String brokerName,
        final Message msg,
        final RemotingCommand response
    ) throws MQBrokerException, RemotingCommandException {
        switch (response.getCode()) {
            case ResponseCode.FLUSH_DISK_TIMEOUT:
            case ResponseCode.FLUSH_SLAVE_TIMEOUT:
            case ResponseCode.SLAVE_NOT_AVAILABLE: {
            }
            case ResponseCode.SUCCESS: {
                // Omit some code
                return sendResult;
            }
            default:
                break;
        }
        throw new MQBrokerException(response.getCode(), response.getRemark());
    }
Copy the code

The message sending client receives an MQBrokerException, catches exception handling that does not comply with message retry logic, and throws the exception directly, which is what the user sees; // timesTotal Number of retry times for sending failures set for message producers

for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if(mqSelected ! =null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // Omit some code
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        / / here is MQBrokerException abnormal processing logic, RemotingSysResponseCode. SYSTEM_BUSY does not meet the requirements for the branches, and finally throw e throws an exception
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if(sendResult ! =null) {
                                    return sendResult;
                                }

                                throwe; }}catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throwe; }}else {
                    break; }}Copy the code

Production environment parameters:

  • Broker BUSY exception: This can be resolved by increasing waitTimeMillsInSendQueue

  • System busy abnormality: can be solved through increasing osPageCacheBusyTimeOutMills

# send queue waiting time waitTimeMillsInSendQueue = 3000 # page caching system busy timeout (translation), the default value is 1000 osPageCacheBusyTimeOutMills = 5000Copy the code

Problem analysis

The exception occurred because we had multiple applications deployed on the same server. We deployed three ES, eight Redis and one RocketMQ on one server, which were all in use during the stress test. Although CPU and memory were still very large, disk I/O and memory frequency were only so large that they might have been fully occupied, or other factors would have an impact.

The importance of server-specific application deployment is illustrated by the fact that when mq and Redis were used in large quantities together, redis was three to four times slower when other things were tested in the test environment. I knew it would have an impact, but I didn’t realize it would be this big.

The ultimate solution: RocketMQ should be deployed on a separate high-performance server.

Note an exception when Using RocketMQ.

Problem Analysis and summary
  1. system busy , start flow control for a while

This exception causes message loss.

  1. broker busy , start flow control for a while

This exception does not cause message loss.

Problem solving process

1. At the beginning of the test, it was found that only System busy appeared on the server with good performance, that is, the message was lost if an exception occurred.

If there is an exception, it will be sent back to the bak queue of the current topic. Since this topic is busy, we should send it to another topic. Kind of a temporary fix.

2. Duplicate messages are found. The broker busy was reported to the TOPIC BAK queue. Because broker busy may not cause messages to be lost, message duplication occurs.

Solution:

Modify the RocketMQ configuration file:

  • Solution a: sendMessageThreadPoolNums change to 1, if not add a line. sendMessageThreadPoolNums=1

  • Scheme 2: useReentrantLockWhenPutMessage change to be true, if not add a line.

sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true
Copy the code

SendMessageThreadPoolNums this attribute is send the thread pool size, rocketmq4.1 version after the default is 1, what version of the default before don’t know but it is greater than 1. This attribute to 1, they will take care of useReentrantLockWhenPutMessage this property;

If the change is greater than 1, you need to useReentrantLockWhenPutMessage this attribute is set to true;

The current test did not find what is the difference between the two solutions, also support multithreading send sendMessageThreadPoolNums = 1, feeling and sending speed sendMessageThreadPoolNums is greater than 1 there is no difference, can run with 100 MB card.

Feel if useReentrantLockWhenPutMessage = true, is to make the lock, and then the key code in fact or a single thread processing;

The solution
  1. Exception catch in the business logic processing and resend the message if MQBrokerException is caught and responseCode is 2;
  2. Modify the default waitTimeMillsInSendQueue(milliseconds) for the broker to send messages.

In addition, you can also observe the I/O status of the disk when the error is reported. This error may be caused by the high I/O status of the disk at that time, which leads to the message falling time.

The resources

  • Thinkinjava. Cn / 2019/05/08 /…

  • www.cnblogs.com/zhyg/p/1025…

  • Blog.csdn.net/prestigedin…

  • Thinkinjava. Cn / 2019/05/08 /…