In the last article, we looked at some of the basic features of RocketMq in the message model. From this point on, we will gradually learn every knowledge point of RocketMq.

The basic concept

What is the MQ?

MQ is often translated as message queue, but I think the proper term for this is message middleware. RocketMq is a publish-subscribe messaging middleware. It is commonly used in the following scenarios:

  • communication

For example, for cross-language communication, MQ can be used if real-time requirements are not high.

  • Asynchronous decoupling

Two pictures are used to illustrate:

  • Traffic peak clipping

For example, in the second kill scenario, in addition to traffic limiting based on token bucket algorithms, THE high throughput, high performance, and high availability features of MQ can be used to directly transfer requests to MQ and then process second kill requests slowly based on the processing capacity of the service server to achieve peak traffic clipping.

architecture

Here is the architecture diagram of the official websiteAs you can see, RocketMq consists of four main parts:

Name server

The name server is similar to ZooKeeper in Kafka and provides two main functions:

  • Simple route registration and discovery mechanism

Once the cluster is started, it sends routing information about the cluster to each server in the Nameserver cluster. The second is that each server stores a complete list of broker cluster routing information, which is simply the routing information of topic and queue (partition) information

  • The heartbeat detection

First, heartbeat requests are sent by the broker to the Nameserver cluster every 30 seconds. What are the heartbeat requests? Contains information about all the topics on the broker, and Nameserver receives the topic information to update it (where each heartbeat request records the update time). A reverse check is performed every 10 seconds. If it has been more than 2 minutes since the last update, that is, there has been no heartbeat request from the broker within 2 minutes, the broker is considered offline and the corresponding relationship between the Topic and the broker is adjusted

Proxy server

The Broker is responsible for storing, delivering, querying messages and ensuring high availability of services. To achieve these functions, the Broker consists of several important sub-modules.

  • Remoting Module: The entity of the entire Broker that handles requests from clients.
  • Client Manager: Manages clients (Producer/Consumer) and maintains Topic subscription information of consumers
  • Store Service: Provides convenient and simple APIS for storing messages to physical disks and querying messages.
  • HA Service: A high availability Service that provides data synchronization between Master and Slave brokers.
  • Index Service: Indexes messages delivered to the Broker based on a specific Message key to provide quick lookup of messages.

producers

The producer is the sender of the message and supports distributed cluster deployment. When sending a message, the producer first obtains the routing information from the TopicPublishInfoTable in the local cache. If not, the producer pulls the routing information from nameserver and updates the local route cache. Then, the producer requests nameserver to pull the routing information once every 30 seconds.

consumers

  • Message consuming role. Supports distributed cluster deployment. Messages can be consumed in push and pull modes.
  • The Consumer sends heartbeat messages to the Broker every 30 seconds. The Broker checks live consumers every 10 seconds. If a Consumer has no heartbeat for 2 minutes, it disconnects from the Consumer and sends notifications to other instances of the Consumer group to rebalance the Consumer cluster.
  • Message consumption supports cluster consumption and broadcast consumption, and their differences lie in:

Message correlation concept

The message

The physical carrier of data transmission, the smallest unit that produces and consumes data, and each message must belong to a topic. Each Message in RocketMQ has a unique Message ID and can carry a Key with a business identity. The system supports Message query by Message ID and Key. The body of the message store is the Commitlog, which I’ll cover later, but I won’t go into too much detail here.

Topics and queues

A topic represents a collection of messages of a class. A topic can contain multiple queues. Queues are not the same thing as partitions in Kafka, but logical partitions. The difference between the two is:

  • Partitions in Kafka actually store messages, and different partitions can reside on different brokers, which can be distributed and scalable
  • Queue in RocketMQ is a logical partition concept. It does not actually store the message entity. Instead, it stores the start physical offset of queue messages in the Commitlog, the message size, and the HashCode value of the message Tag

The two have something in common:

  • The messages in each queue are ordered, and only one consumer can consume messages from a partition in the same consumer group, while a consumer can consume messages from multiple partitions

The label

Flags set for messages to distinguish different types of messages under the same topic. Messages from the same business unit can be tagged differently under the same topic for different business purposes. For example, tag represents an order message, goods represents an item message, etc

The order message

Sequential message here often refers to the ordering of message consumption, that is, the sender will be consumed in the same order according to the order in which the message is sent.

The actual situation is that a topic can set up multiple queues, different messages may be sent to different queues, each queue may be consumed by different consumers at different speeds, and the message sent later may be executed first. Take the order module for example:As shown in the figure, the order creation and payment messages are sent to Queue1 and Queue2, respectively, but perhaps consumer 1 is consuming slowly and consumer 2 is consuming quickly, causing the payment message to consume first and causing business problems. So what to do?

As I said earlier about topics and queues, messages received in each queue are ordered. Then we can control the message sending of the same order to the same queue. Concrete implementation, for example, can hash the order ID, and then modulo the number of queues. Is that enough?

The answer is no. This is just to ensure that the same message is sent to the same queue to avoid ordering problems caused by different queue consumption speeds. However, we know that in order to speed up the consumption progress, a consumer may open multiple work threads, and if multiple work threads concurrently consume messages in a queue, order problems may occur again.

What to do? RocketMQ actually creates an object lock for each message queue to ensure sequential consumption, so that as long as the message queue is being processed in the thread pool, it will wait for the next consumption to complete, ensuring that messages in the same queue are being consumed sequentially within the current Consumer.

idempotence

RocketMQ cannot avoid message duplication (Exactly-Once), so it is important to de-process at the business level if the business is sensitive to consumption duplication. You can use a relational database for de-weighting. You first need to determine the unique key of the message, either msgId or a unique identifying field in the message content, such as an order Id. Determine if the unique key exists in the relational database before consuming. If not, insert and consume, otherwise skip. (The actual process should consider the atomicity problem, determine whether there is an attempt to insert, if the primary key conflict is reported, the insert fails, directly skip)

Msgids must be globally unique identifiers, but in practice, there may be situations where the same message has two different MSGIds (active retransmissions by consumers, duplicates due to client retransmissions, and so on), which require repeated consumption of business fields.