This is a readme document based mainly on the RocketMQ source code, which summarizes the main processes and key points, and analyzes the source code where necessary, because the documentation is not as clear as looking at the source code. Rocketmq official documentation address: github.com/apache/rock…
The principle of description
Module architecture
RocketMQ is architecturally divided into four main parts, as shown in the figure above:
Producer: a role that publishes messages and supports distributed cluster deployment. The Producer uses MQ’s load balancing module to select the corresponding Broker cluster queue for message delivery, which supports fast failure and low latency.
Consumer: message consuming role, which supports distributed cluster deployment. Messages can be consumed in push and pull modes. At the same time, it also supports the consumption of cluster mode and broadcast mode. It provides real-time message subscription mechanism, which can meet the needs of most users.
NameServer: NameServer is a very simple Topic routing registry that acts like ZooKeeper in Dubbo and supports dynamic registration and discovery of brokers. There are two main functions: Broker management. NameServer accepts the registration information of Broker clusters and stores it as the basic data of routing information. It then provides a heartbeat detection mechanism to check whether the Broker is still alive. Routing information management, where each NameServer holds the entire routing information about the Broker cluster and the queue information for client queries. The Producer and Conumser can then use NameServer to know the routing information of the entire Broker cluster and deliver and consume messages. NameServer is also typically deployed in clusters, where instances do not communicate with each other. The Broker registers its routing information with each NameServer, so each NameServer instance has a complete routing information stored on it. When a NameServer goes offline for some reason, the Broker can still synchronize its routing information with other Nameservers. Producers and consumers can still dynamically perceive the routing information of the Broker.
BrokerServer: The Broker is responsible for storing, Posting, and querying messages and for ensuring high availability of services. To enable these functions, the Broker contains several important sub-modules.
- Remoting Module: The entity of the entire Broker that handles requests from clients.
- Client Manager: Manages clients (Producer/Consumer) and maintains Topic subscription information of consumers
- Store Service: Provides convenient and simple APIS for storing messages to physical disks and querying messages.
- HA Service: A high availability Service that provides data synchronization between Master and Slave brokers.
- Index Service: Indexes messages delivered to the Broker based on a specific Message key to provide quick lookup of messages.
Store concept
- CommitLog: the message body and metadata storage body, which stores the message body content written by the Producer. The message content is not fixed length. The default size of a file is 1 GB, the file name length is 20 bits, and the remaining offset is the start offset. For example, 00000000000000000000 indicates the first file. The start offset is 0, and the file size is 1 GB =1073741824. When the first file is full, the second file is 00000000001073741824, and the start offset is 1073741824, and so on. Messages are written primarily sequentially to the log file, and when the file is full, to the next file;
- ConsumeQueue: message consumption queue, introduced primarily to improve message consumption performance. Since RocketMQ is a topic-based subscription model, message consumption is subject specific, and it is inefficient to retrieve messages by topic through commitlog files. The Consumer can then look for messages to consume based on the ConsumeQueue. ConsumeQueue serves as the index of the consuming messages, storing the CommitLog’s starting physical offset, message size, and HashCode of the message Tag. Consumequeue files can be regarded as commitlog files based on topics. Therefore, the organization mode of the ConsumeQueue folder is as follows: Topic /queue/file Three-layer organization structure, the specific storage path is: $HOME/store/consumequeue / {topic} / {queueId} / {fileName}. Similarly, the consumeQueue file adopts a fixed length design, with each entry having a total of 20 bytes, including 8-byte commitlog physical offset, 4-byte message length, and 8-byte tag hashcode. A single file consists of 30W entries, and each entry can be randomly accessed like an array. Each ConsumeQueue file is about 5.72M in size;
- IndexFile: IndexFile provides a way to query messages by key or time interval. The Index file is stored in:
$HOME \store\index${fileName}
An IndexFile can hold 2000W indexes. The underlying storage of IndexFile is designed to implement the HashMap structure in the file system. So rocketMQ’s index file is implemented as a hash index.
Call logic
nameserv
Nameserv is simple. It is basically the logic of a registry. After the broker is started, it registers its address information with Namserv. Then, when the Producer instance starts, it links to Nameserv and obtains broker information. The name does not carry load balancing.
producer
Because the load balancing is done on the client side, Producer starts a client, which starts the Netty instance, connects to NameserV, and registers the Producer with Nameserv. Then, when sending a message, the producer retrieves the stored list of topic brokers from nameserv, updates the local cache, and tries to find the corresponding broker for the topic.
Therefore, the final thing sent by producer is to select the broker and then select the brocker consumption queue, encapsulate the id of the queue, topic of the message, content of the message, tag and other things into the message message, and send it to the broker through netty.
broker
The Broker is the most complex of all modules. It starts with a bunch of thread pool queues, roughly a dozen of them, but these thread pools are also the core logic that implements its message queue.
MessageStore is a message storage service. It handles commitlog and consumeIndex storage, but these two are done on different threads. The producer sends messages directly to the Commitlog. When the broker starts, a single thread is created to retrieve messages that are not consumed from the Commitlog. The ConsumeQueue instance is constructed and the ConsumeQueue index file is created. It’s used for consumer consumption.
Remotingservice and fastRemotingServer are both netty clients and servers used to send and receive messages with Namesrv, Producer and Consumer.
Receiving the Producer message
The netty Reactor (Reactor) model for receiving and sending messages is the same as the netty Reactor model (Reactor) model for receiving and sending messages. The netty Reactor model for receiving and sending messages is the same as the Netty Reactor model (Reactor).
SendMessageProcessor
processRequest
The subsequent processing process here is to verify the request header, request type, etc., assemble the stored data structure, and finally call the class messageStore created when we started earlier to store it.
During the storage process, only the commitlog is stored, and neither the consumption queue nor the index file of the consumer queue is built. After writing the commitlog, it will be directly returned to the producer, indicating success.
ConsumeQueue build
When I read the developer’s guide, I missed some places, so I didn’t know exactly when the consumeQueue was created when I read the source code. As a result, I had a gap in my thinking. Later, I looked at the document carefully, and realized that it actually started a thread when the broker started. Send messages to different queues based on commitlogs, because in my mind, I might as well just throw them into the thread pool and asynchronously build a consumeQueue, although I still need one of these threads to compensate for power failures.
Within the start() method of the broker, a messageStore is started, and within the messageStore, a single-threaded processor is started.
The start method is a dead-loop that reads messages from commitlog as long as the service continues, as implemented in doReput().
Dispatch () calls putMessagePositionInfo(), First found in consumeQueueTable putMessagePositionInfo method corresponding TOPIC and queueID ConsumerQueue then call ConsumerQueue putMessagePositionInfoWrap The per method builds the ConsumerQueue.
ConsumeQueue putMessagePositionInfoWrapper completed ConsumerQueue build.
PutMessagePositionInfo build consumerQueue
So this is how consumeQueue is built.
consumer
After Consumer is started, it continuously sends heartbeat packets (containing information such as message consumption group name, subscription collection, message communication mode, and value of client ID) to all Broker instances in the RocketMQ cluster via a scheduled task. The Broker receives a Consumer heartbeat message and maintains it in the Local buffer variable “consumerTable” of ConsumerManager. The encapsulated client network channel information is also stored in the local buffer variable “channelInfoTable”. Provide metadata information for Consumer side load balancing.
The consumer’s load balancing, or concurrency, is based on the consumeQueue, and just to explain why we need consumeQueue, in RocketMQ’s message model, we actually use request-acknowledge. Determine the message transmission due to network or server in the process of fault, or failure of consumption is missing, no push, so at the consumer end, consumers to buy the messages are received operation logic (for example, the data are saved to the database), will be returned to the broker a confirmation ACK, the broker will decided that the news consumption is successful, But this brings consumers a problem, because to guarantee the order of the message, before a message is successfully consumption will not continue to the next message consumption (if really consumption not to drop, the broker will put the news in a separate dead-letter queue, for subsequent processing), otherwise the message will appear empty, cannot guarantee the order, Such as an order produced three messages, order creation and order of payment, the order is completed, the order cannot be disorderly, order consumption makes sense, so in this case the following the order need to make sure that the news consumption, rocketmq is put these three message is in a queue, successful team head message without consumption, consumption is not the one, But if two unrelated orders want to be consumed at the same time, add a queue to increase the number of consumers so that concurrent expansion can be achieved.
So the way Rocketmq works is that for messages that need to be ordered, they are placed in the same consumption queue. In Rocketmq’s design, there are multiple consumption groups under the same topic. The concept of consumption groups is that messages under a topic are placed in all consumption groups. Each Consumer group has a complete message in the consumption topic, and consumption progress among different Consumer groups is not affected by each other. That is to say, a message once consumed by Consumer Group1 will also be consumed by Consumer Group2. A consumer group contains multiple consumers, and the consumers in the same group are competing consumers. Each consumer is responsible for part of the messages in the consumer group. If a message is consumed by consumer Consumer1, no other consumers in the same group will receive the message.
For the above order, when the producer sends messages, a load balancing algorithm is used, which can be customized by the customer.
// RocketMQ uses the algorithm implemented in MessageQueueSelector to determine which queue the message is sent to. SendResult SendResult = producer.send(MSG, newMessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size();return mqs.get(index);
}
}, orderId);
Copy the code
So that we can decide, in the same order number can be customized modulus assigned to the same queue, multiple queue in each consumer consumption and management of the entire group, the news of the order in the queue is orderly, under the entire consumer group or the topic is unordered, both to ensure the order, and guarantee the increase concurrency cause no message is empty.
Actually I personally think the out-of-order consumer can tolerate message empty, because I did something similar, is you fail consumption can compensate to push, write a timer to time look at the message not sent, send again once, at the same time can also use the thread pool concurrent sent to improve performance. It is possible that RocketMQ, as a production-oriented system, requires a variety of considerations to use a pattern that is compatible with both strict order consumption and normal order consumption (in fact, out-of-order consumption is the case if the default is a random delivery queue).
This is a lot of stuff about why mq’s consumption model is made up of consumer groups, consumer queues, and consumers. Let’s talk about how queues are assigned to consumers in the code.
Starting the MQClientInstance instance in the Consumer instance startup process completes the start of the load balancing service thread – RebalanceService (every 20 seconds). A review of the source code shows that the Run () method of the RebalanceService thread ends up calling the rebalanceByTopic() method of the RebalanceImpl class, which is the core of the Consumer side load balancing. Here, the rebalanceByTopic() method does different logic depending on whether the consumer communication type is “broadcast mode” or “cluster mode.”
This is described in the developer documentation, and finally find the rebalanceByTopic() method of rebalanceImpl, which is the implementation of the specific complex equilibrium, or should I say, how to allocate queues to consumers more precisely. Let’s take a look at the allocation in Cluster mode.
Message consumption queue load balancing among different consumers in the same consumer group, its core design concept is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.
The idea is to ensure orderliness and concurrency, allowing one consumer to consume multiple queues.
other
- Asynchronous synchronous disk flushing
- Receive Reactor thread model
- Communication protocol
- Transaction message
These official documents have been written more clearly.
practice
RocketMQ’s distributed transaction implementation, even based on its transaction messages, can be seen in the article RocketMQ Distributed Transaction – Complete Example. In fact, the idea is to service here internal digestion, such as transaction sponsors have finished transaction message is sent, the transaction going to deal with abnormal situation consumer consumption, such as the inventory is not enough, then notify the staff to add inventory, etc., or consumer fails, the processing retry mechanism to ensure the power, etc., all need to consumers.