Cabbage Java self study room covers core knowledge

RocketMQ Is a Java engineer’s path to advancement.

There are many mainstream messaging middleware in the market today, such as the old ActiveMQ, RabbitMQ, the hot Kafka, RocketMQ independently developed by Alibaba, etc. Message queue has gradually become the core means of internal communication in enterprise IT systems. It has a series of functions, such as low coupling, reliable delivery, broadcast, flow control and final consistency, and has become one of the main means of asynchronous RPC.

1. Design purpose of RocketMQ

1.1. Publish/subscribe

Publish subscription is the most basic function of message-oriented middleware, compared with traditional RPC communication.

1.2. Message priority

The priority described in the specification refers to that in a message queue, each message has a different priority, which is generally described as an integer. The message with a higher priority is delivered first. If the message is completely in a memory queue, it can be sorted according to the priority before delivery, so that the message with a higher priority is delivered first.

Since all RocketMQ messages are persistent, sorting by priority can be very expensive. RocketMQ does not specifically support message priority, but it is possible to work around this by configuring a high priority queue and a normal priority queue separately. Just send different priorities to different queues.

Priority problems can be divided into two categories:

  1. As long as the priority is achieved, it is not strictly a priority, usually the priority is divided into high, medium, low, or a few more levels. Each priority can be represented by a different topic. Specifying a different topic for a priority when sending a message solves most of the priority problems, but compromises the accuracy of the business priority.

  2. Strict priority. The priority is expressed as an integer, such as 0 to 65535. This kind of priority problem is usually solved by different topics. Having MQ solve this problem can have a significant impact on MQ performance. One thing to make sure of here is, does the business really need this kind of strict priority, and how much impact does it have on the business if you reduce the priority to a few?

1.3. Message order

Message ordering refers to the ability to consume a class of messages in the order in which they are sent. For example, an order generates three messages, order creation, order payment, and order completion. When it comes to consumption, it makes sense to consume in this order. But simultaneous orders can be consumed in parallel.

RocketMQ ensures strict message ordering.

1.4. Message filtering

Broker message filtering

In the Broker, filtering as required by the Consumer has the advantage of reducing the network transmission of unwanted messages to the Consumer. The disadvantage is that it increases the burden on the Broker and the implementation is relatively complex.

  1. Taobao Notify supports a variety of filtering methods, including direct filtering by message type and flexible syntax expression filtering, which can almost meet the most demanding filtering requirements.
  2. RocketMQ supports filtering by simple Message Tag, Message Header, and body.
  3. Flexible syntactic expression filtering is also supported in the CORBA Notification specification.

Message filtering on the Consumer end

This method of filtering can be fully customized by the application, but the disadvantage is that many useless messages are transmitted to the Consumer side.

1.5. Message persistence

There are several types of persistence commonly used by message-oriented middleware:

  1. Persist to a database, such as Mysql.
  2. Persist to KV storage, such as levelDB, Berkeley DB and other KV storage systems.
  3. File record form persistence, such as Kafka, RocketMQ
  4. Make a persistent image of the in-memory data, such as Beanstalkd, VisiNotify
  5. (1), (2), and (3) all have the ability to expand the Buffer of the memory queue, and (4) is a mirror of the memory, which can be restored after the Broker hangs and restarts.

The JMS and CORBA Notification specifications do not specify how to persist, but the performance of the persistence part directly determines the performance of the entire messaging middleware.

RocketMQ takes full advantage of the Linux file system memory cache to improve performance.

1.6. Message reliability

Several conditions that affect message reliability:

  1. Broker closed
  2. The Broker abnormal Crash
  3. OS Crash
  4. The machine lost power, but the power supply was immediately restored.
  5. The machine cannot be started (CPU, motherboard, memory and other key equipment may be damaged)
  6. The disk device is damaged.

(1), (2), (3), and (4) are all cases where hardware resources can be recovered immediately, and RocketMQ can ensure that messages are not lost, or a small amount of data is lost (depending on whether the flushing is synchronous or asynchronous).

(5) and (6) are single points of failure and cannot be recovered. Once the failure occurs, all messages on this single point are lost. RocketMQ ensures 99% of messages are not lost through asynchronous replication in both cases, but a very small number of messages can still be lost. Synchronous double write technology can completely avoid single points, synchronous double write is bound to affect performance, suitable for high message reliability requirements, such as money-related applications.

RocketMQ supports synchronous double-write starting with version 3.0.

1.7. Real-time messaging

Without messages piling up, messages can reach the Consumer as soon as they reach the Broker.

RocketMQ uses long polling Pull to ensure that messages are as real as Push.

1.8. Do it at least once

Each message must be delivered once.

RocketMQ Consumer first pulls the message to the local and then returns an ACK to the server after the consumption. If there is no consumption, RocketMQ will not ack the message, so RocketMQ supports this feature well.

1.9. Only once guaranteed

  1. During message sending phase, duplicate messages are not allowed to be sent.
  2. The consuming message phase does not allow consuming duplicate messages.

A message can Only be considered “Exactly Only Once” if the above two conditions are met, and to achieve these two conditions in a distributed system environment, it is inevitable to incur huge overhead. So RocketMQ, in pursuit of high performance, does not guarantee this feature, requiring business de-duplication, which means consuming messages idempotent.

Although RocketMQ is not a strict guarantee against duplication, it is rare for messages to be sent or consumed twice, except for network exceptions, such as when a Consumer starts or stops.

1.10. The Broker’s Buffer overflows

A Buffer is the size of a queue in the Broker. This type of Buffer is usually limited in size. What if the Buffer is full? Here is how it is handled in the CORBA Notification specification:

  1. Reject the new message and return the RejectNewEvents error code to Producer.
  2. Discarding existing messages according to a specific policy:
  • AnyOrder – Any event may be discarded on overflow. This is the default setting for this property.
  • FifoOrder – The first event received will be the first discarded.
  • LifoOrder – The last event received will be the first discarded.
  • PriorityOrder – Events should be discarded in priority order, such that lower priority events will be discarded before higher priority events.
  • DeadlineOrder – Events should be discarded in the order of shortest expiry deadline first.

RocketMQ does not have the concept of an in-memory Buffer. RocketMQ queues are persistent disks and data is flushed periodically.

RocketMQ’s Buffer is abstracted into a queue of infinite length that can accommodate no matter how much data comes in. This infinity is provided that the Broker periodically removes expired data. For example, if the Broker only stores messages of 3 days, the Buffer may be unlimited in length, but data that is 3 days old will be deleted from the queue.

The essence of this problem is that the network call has an indeterminacy, which is the third state that neither succeeds nor fails, hence the message repeatability problem.

1.11. Retrospective consumption

Backdating consumption is when a Consumer has successfully consumed a message. To support this function, the message needs to be retained after the Broker has delivered a successful message to the Consumer because of business requirements for re-consumption. Reconsumption is usually done in time. For example, if the Consumer system fails and needs to be reconsumed one hour ago, the Broker should provide a mechanism to reverse the consumption progress in time.

RocketMQ supports consumption backward in time, millisecond accurate, both forward and backward.

1.12. Message accumulation

The main function of message-oriented middleware is asynchronous decoupling, and another important function is to block the front-end data flood peak and ensure the stability of back-end system, which requires message-oriented middleware to have certain ability of message stacking. The message stacking is integrated in the following two situations:

  1. Messages are piled up in the memory Buffer, and once the memory Buffer is exceeded, messages can be discarded according to certain discarding policies, as described in the CORBA Notification specification. It is suitable for the service that can tolerate discarding messages. In this case, the capacity of message accumulation mainly depends on the size of the memory Buffer, and the performance degradation is not significant after the message accumulation, because the amount of data in the memory has a limited impact on the external access capacity.
  2. Messages are stacked to persistent storage systems, such as DB, KV storage, and file record form. When messages cannot be hit in the memory Cache, a large number of read I/OS will be generated to access disks. The throughput of read I/OS directly determines the access capability after message accumulation.

There are four main points to evaluate message stacking capability:

  1. How many messages can stack up? How many bytes? That is, the stack capacity of messages.
  2. Is the throughput of sending messages affected by message accumulation?
  3. Will normal consumers be affected by the news accumulation?
  4. What is the throughput of accessing messages stacked on disk after messages have piled up?

1.13. Distributed transactions

Several distributed transaction specifications are known, such as XA, JTA, etc. The XA specification is widely supported by major database manufacturers, such as Oracle, Mysql, etc. Among them, THE outstanding TM implementation of XA, such as Oracle Tuxedo, has been widely used in finance, telecommunications and other fields.

Distributed transaction involves two-stage commit problem, and the support of KV storage is necessary for data storage, because the second-stage commit rollback needs to modify the Message state, which must involve the action of searching Message according to Key. RocketMQ bypassed the problem of finding Message based on Key in stage 2. It used stage 1 to send Prepared and get the Offset of the Message. Stage 2 used Offset to access the Message and modify the state.

RocketMQ, which implements transactions through Offset rather than KV storage, has a significant drawback, namely, changing data through Offset can cause too many dirty pages in the system, which requires special attention.

1.14. Timed messages

Timed messages are messages that cannot be consumed by consumers immediately after they are sent to the Broker, but must be consumed at a specific point in time or after a specific period of time. To support arbitrary time precision, message ordering must be done at the Broker level, which inevitably incurs significant performance costs if persistence is involved.

RocketMQ supports timing messages, but does not support arbitrary timing accuracy. It supports specific levels, such as timing 5s, 10s, and 1m.

1.15. Message retries

When a Consumer fails to consume a message, a retry mechanism is provided to make the message consume again.

Consumer Consumer message failure can be considered in the following situations:

  1. Due to the reasons of the message itself, such as deserialization failure, message data itself cannot be processed (such as phone charge recharge, the current message mobile phone number is cancelled, cannot be recharged). This error usually requires that the message be skipped and consumed before another message is consumed. 99% of failed messages are unsuccessful even if they are consumed immediately. Therefore, it is best to provide a timed retry mechanism, that is, retry after 10 seconds.
  2. The dependent downstream application service is unavailable, for example, the DB connection is unavailable, and the external system network is unreachable. With this error, even if the current failed message is skipped, consuming other messages will also result in an error. In this case, it is recommended to apply a sleep 30s before consuming the next message to reduce the stress on the Broker to retry the message.

2. RocketMQ consumption model

Generally speaking, the consumption model of message queue can be divided into two types: push message model and poll message model.

A message system based on push model, which records consumption status by message agent. When a message broker pushes a message to a consumer, it marks the message as consumed, but this approach does not guarantee the processing semantics of the consumption. For example, after we send the message to the consumer, the message is permanently lost if we mark it as consumed in the consumer agent because the consumption process hangs or we do not receive the message due to network reasons. If we use the producer to reply after receiving the message, the message broker needs to record the consumption status, which is not desirable.

Those of you who have used RocketMQ can’t help but think, aren’t there two types of consumers in RocketMQ?

  1. MQPullConsumer and MQPushConsumer

Isn’t MQPushConsumer our push model? In fact, the two models are the client to actively pull the message, the implementation differences are as follows:

  • MQPullConsumer: Each pull message needs to pass in the offset of the pull message and how many messages are pulled each time. The client controls where and how many messages are pulled.

  • MQPushConsumer: Similarly, the client takes the initiative to pull the message, but the message progress is saved by the server. The Consumer will regularly report where he/she consumes, so the Consumer can find the point of last consumption when he/she consumes next time. Normally with PushConsumer, we don’t care how much data is pulled from offset, just use it.

  1. Cluster consumption and broadcast consumption

There are two consumption modes: cluster consumption and broadcast consumption:

  • Cluster consumption: The same GroupId all belong to the same cluster. Generally, a message can only be processed by any one consumer.

  • Broadcast consumption: broadcast consumption messages are sent to all consumers in the cluster, but note that because the broadcast consumption offset is too expensive to save on the server, the client consumes the latest message each time it restarts, not the last saved offset.

3. RocketMQ network model

RocketMQ uses Netty instead of native sockets for network communication. There are several reasons that RocketMQ uses Netty instead of native sockets.

  1. The API is simple to use, doesn’t need to worry too much about network details, and is more focused on middleware logic.
  2. High performance. Mature and stable, JDK NIO bugs have been fixed.

To ensure efficient network communication, the network threading model is one of the following: 1+N (one Acceptor thread, N I/O threads), 1+N+M (one Acceptor thread, N I/O threads), 1+N (one Acceptor thread, N I/O threads), M worker threads), RocketMQ uses the model of 1+N1+N2+M, as shown in the figure below:

1 Acceptor thread, N1 IO thread, N2 thread for shake-hand, SSL verification, codec; M threads are used for business processing. This has the advantage of placing potentially time-consuming operations such as codec and SSL authentication in a separate thread pool that does not occupy our business thread and IO thread.

4. RocketMQ storage model

As a good message system, high performance storage, high availability are necessary. RocketMQ’s storage core design is very different from Kafka’s, so its write performance is also very different. This is a performance test conducted by ali middleware team under RocketMQ and Kafka in 2016:

As can be seen from the graph:

  • Kafka saw a 98.37% drop in throughput as the number of topics increased from 64 to 256.
  • RocketMQ throughput dropped only 16% as the number of topics grew from 64 to 256.

Why is that? Kafka all messages under a topic are distributed as partitions on multiple nodes. On A Kafka machine, there is actually a log directory for each Partition, and multiple log segments for each Partition. So if Kafka writes a lot of topics sequentially, but actually writes too many files, it can cause disk I/O competition.

So why is RocketMQ able to maintain high throughput even with multiple topics? Let’s start by looking at the key files in RocketMQ:

  • CommitLog: the message body and metadata storage body, which stores the message body content written by the Producer. The message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. Messages are written primarily sequentially to the log file, and when the file is full, to the next file;

  • Config: Saves some configuration information, including some Group, Topic and Consumer offset information.

  • ConsumeQueue: message consumption queue, introduced primarily to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is subject specific, and it is inefficient to retrieve messages by topic through commitlog files.

The Consumer can then look for messages to consume based on the ConsumeQueue. ConsumeQueue serves as the index of the consuming messages, storing the commitLog’s starting physical offset, message size, and HashCode of the message Tag.

Consumequeue files can be regarded as commitlog files based on topics. Therefore, the organization mode of the ConsumeQueue folder is as follows: Topic /queue/file Three-layer organization structure, the specific storage path is: HOME \store\index${fileName} fileName is named after the timestamp when it was created. The fixed size of a single IndexFile is about 400M. An IndexFile can hold 2000W indexes. The underlying storage of IndexFile is designed to implement the HashMap structure in the file system, so the rocketmq IndexFile is implemented as a hash index.

We found that instead of writing to multiple files like Kafka, our message body data was written to a single file, so we had very little write IO competition and could maintain high throughput across many topics. The ConsumeQueue is writing all the time, and the ConsumeQueue is creating files in the Queue dimension, so there are still a lot of files, and the ConsumeQueue is writing a very small amount of data, only 20 bytes per message, 30W pieces of data are only around 6M, so the impact on us is actually much smaller than that on Kafka topics. Our entire logic can be as follows:

The Producer keeps adding new messages to the CommitLog, and a scheduled task, ReputService, keeps scanning the added CommitLog, and then builds the ConsumerQueue and Index.

Note: this is a common hard disk, SSD concurrently write multiple files and a single file is not affected.

In Kafka, each Partition is a separate file, so when consuming a message, sequential reads occur. As we know, when the OS reads a file from a physical disk, it prereads the file from other adjacent blocks in sequence, putting the data into the PageCache. So Kafka reads messages better.

The RocketMQ read process is as follows:

  1. Offset in the ConsumerQueue corresponds to the CommitLog physical offset
  2. Read CommitLog from offset

The ConsumerQueue is also a separate file per Queue, and its file size is small, so PageCache is easy to use to improve performance. Since consecutive messages on the same Queue are not consecutive on the CommitLog, they cause random reads. RocketMQ has several optimizations for this:

  • Mmap map reading reduces the performance overhead of traditional I/O copying disk file data back and forth between the buffer of the operating system kernel address space and the buffer of the user application address space
  • DeadLine scheduling algorithm and SSD storage disks are used

Because Mmap mapping is limited by memory, when Mmmap data is not mapped (that is, messages are piled up too much), the default memory is 40%, and requests are sent to the SLAVE to relieve the pressure on the Master.

5. RocketMQ high availability

Cluster pattern

We first need to choose a clustering mode to accommodate the level of availability we can tolerate. Generally speaking, there are three types:

  • Single Master: This mode, the lowest availability, but also the lowest cost, once down, all unavailable. This is generally only applicable to local tests.
  • Single Master with multiple Slaves: In this mode, the availability is average. If the Master goes down, all writes are unavailable, but reads are still available. If the Master disk is damaged, data from the Slave can be relied on.
  • Master: In this mode, the availability is general. If some master is down, then the messages on this part of the master cannot be consumed or written. If a Topic has queues on multiple masters, then the part that is not down can be consumed and written normally. Messages will be lost if the master’s disk is corrupted.
  • Multi-master multi-slave: This mode has the highest availability, but also the highest maintenance cost. When the master goes down, only the queues on this part of the master cannot be written, but they can still be read, and if the master disk is damaged, they can rely on data from the slave.

In general, the fourth option will be selected to ensure maximum availability when put into production.

Availability of messages

RocketMQ provides both synchronous and asynchronous flushes to satisfy our requirements. When flushes are synchronized, flushes will return FLUSH_DISK_TIMEOUT. If an asynchronous flush does not return flush information, select synchronous flush to ensure that our message will not be lost to the greatest extent.

In addition to selecting storage, our master/slave synchronization provides synchronous and asynchronous replication modes. Of course, selecting synchronous can improve availability, but the RT time of sending messages will decrease by about 10%.

Dleger-RocketMQ

We have done a lot of analysis of the master-slave deployment mode above, and we found that when the master fails, our writes will be unavailable anyway, unless we restore the master or manually switch our slave to master. As a result, our slave is mostly used for reading. In recent releases RocketMQ has introduced Dleger-RocketMQ, which uses the Raft protocol to copy commitlogs and automatically selects the master so that writes remain available if the master goes down.

6. RocketMQ timing/delay messages

Timed messages and delayed messages are commonly used in actual service scenarios, such as the following scenarios:

  • The order is closed automatically if it is not paid due to timeout, because in many scenarios the inventory is locked after the order is placed and needs to be closed due to timeout.
  • You need some kind of delay, like some bottom-of-the-pocket logic, and when you’re done with that logic, you can send a delay message like half an hour to do bottom-of-the-pocket check compensation.
  • A message is sent to the user at a certain time, and delayed messages can also be used.

In the open source version of RocketMQ, delayed messages do not support arbitrary latency. There are several fixed latency levels that need to be set. The default is currently: 1S 5s 10s 30s 1M 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1H 2h, from 1s to 2h respectively corresponding to levels 1 to 18. Aliyun version (charging) can support any time within 40 days (millisecond level). Let’s take a look at the schematic of scheduled tasks in RocketMQ:

Step1: The Producer sets the delay level for the messages sent by himself.

Step2: the Broker finds that the message is delayed and replaces topics with delayed topics. Each delayed level is stored as a separate queue, storing its own Topic as additional information.

Step3: build ConsumerQueue

Step4: scan the ConsumerQueue of each delay level periodically.

Step5: Get the CommitLog Offset in the ConsumerQueue, get the message, and determine whether the execution time has reached

Step6: if so, restore the Topic of the message and repost it. If not, the task is delayed for the unreached period of time.

As you can see, delayed messages are implemented by creating separate topics and queues. If we want to achieve any degree of time within 40 days, based on this scheme, we need 40,24,60,601,000 queues, which is very expensive. So how does the support of any time on Ali Cloud come about? The TimeWheel is used to replace our ConsumeQueue, store commitlog-offset, retrieve the current time through the TimeWheel, and send messages again.

7. RocketMQ transaction message

Transactional messages are also a feature of RocketMQ to help achieve the ultimate consistency of distributed transactions:

The steps for using transaction messages are as follows:

Step1: Call sendMessageInTransaction to send a transaction message.

Step2: If the send succeeds, the local transaction is executed.

Step3: If the local transaction succeeds, send commit; if the local transaction fails, send rollback.

Step4: If one of these phases, such as commit, fails, rocketMQ periodically checks back from the Broker to check the status of the local transaction.

The whole process of using transaction messages is relatively complex compared with the previous several messages. The following is the schematic diagram of transaction message implementation:

Step1: sending a transaction message, also called halfMessage, replaces Topic with the Topic of halfMessage.

Step2: send commit or rollback. If it is commit, the previous message will be queried, and then the message will be restored to the original Topic, and an OpMessage will be sent to record the current message and delete it. If it’s rollback, it just sends an OpMessage to delete it.

Step3: The Broker has a scheduled task to process transaction messages, and compares halfMessage and OpMessage periodically. If there is an OpMessage and its status is deleted, the message must be commit or rollback, so the message can be deleted.

Step4: if the transaction times out (default: 6s) and there is no opMessage, then it is likely that the commit message is lost. We will reverse check our Producer’s local transaction status.

Step5: perform Step2 according to the queried information.

We found that RocketMQ also implements transactional messages by modifying the original Topic information, as well as delayed messages, and then simulating consumption as a consumer, doing some special business logic. Of course, we can use this approach to do more extensions to RocketMQ.

RocketMQ Is a Java engineer’s path to advancement.