The messaging middleware used before is ActiveMQ. Due to the lack of maintenance and data documents, it is difficult to troubleshoot problems. Therefore, under this opportunity, RocketMQ is used after investigation.

RocketMQ is a pure Java, distributed, queue model open source messaging middleware, formerly known as MetaQ. It is a queue model messaging middleware developed by Ali. It was later opened to the Apache Foundation and became the top Open source project of Apache.

Taking a leaf out of Kafka’s book, RocketMQ favors stable and business-oriented operations.

1. Overall structure

The core components are Nameserver, Broker, Producer, and Consumer


Each module has a long link to communicate with each other. This architectural pattern is similar to Dubbo in that each module has a registry to maintain related information. The difference is that the Broker module plays an important role in the messaging system.

Two, advantages and characteristics


Stability, high performance, and rich message types

There are many related advantages and disadvantages on the Internet, here are two recommended:

Ali RocketMQ performance introduction

RocketMQ vs. Kafka (18 differences)

Three, brief each noun concept

Topic

A message topic is a logical classification of messages, such as what kind of operations they belong to. For example, inventory related, order related, activity related, etc. It is understood as an abstract classification specification, and the operations of all producers are classified according to topics. Different producers send messages to the specified topics, and different consumers subscribe to the specified topics, pulling message consumption from above and shielding the underlying message storage.

Tag

To further refine the Topic, there is this line annotation in the official document of Ali Cloud: “Message Tag, which can be understood as the label in Gmail, can reclassify messages, so that consumers can specify filtering conditions in the RocketMQ version of Message queue server filtering”.

See this Topic and Tag Best Practices for more details

Message

It is the carrier of message delivery in the message queue.

To send a message is to send it to a Topic, where each message consists of the following parts:

  • Message ID

Globally unique identifier of a message, automatically generated during RocketMQ delivery, that uniquely identifies a message

  • Message key

The business id of the message, set by the message producer Producre, uniquely identifies a business logic.

  • Message Body

The body of the content carried by the message. This is where you typically customize the content delivered. Remember to serialize the message content.

For details about the Message core structure, refer to the Message Detail shown in the console


MessageQueue

The Topic mentioned above is an abstraction. The actual place where messages are sent and consumed is the Message Queue, and there may be multiple Message queues under each Topic. The reason for introducing queues is to improve availability and flexibility. By the nature of queues, FIFO, messages sent first are consumed first.

For example, by default, a Topic allocates four Message queues (defaultTopicQueueNums). If there are two brokers, the two will be split. If you build a Broker locally, you should see the same as I did: There are four Message queues in one Broker

The destination to which messages are sent and the source of consumption is the Message Queue


Group

Group information. A group can subscribe to multiple topics.

There are Producer groups and Consumer groups. Each application can have multiple Producer groups and Consumer groups. However, the recommended usage is that each application specify a Producer Group to unify the information of the message sender.

Generally, an application only needs to set up a consumer group to subscribe to topics for consumption. If a Topic wants to have two processing logic in an application, it can be configured to have different consumer groups, and it can be implemented to have different Handler handlers for the same Topic message.


Offset

Displacement, used to hold the progress of message consumption.

As we have seen above, there are multiple Message queues under one Broker, and we need a subscript to record where messages are consumed. Offset can be used to locate the current consumable message location, indicating that the next Consumer will consume the message from the location after Offest.

In the code, Offset is the base type of long, which accesses messages at the location specified in the Message Queue.

Order Message ordering

A type of message that is published and consumed sequentially, divided into globally ordered messages and partitioned ordered messages.

Global order: This is easier to understand. For the same Topic, no matter how many consumers there are, the Message queue can only be one by one in order, and the next Message consumption depends on the completion of the previous consumption. Suitable for scenarios with low performance requirements, but this mode is rarely selected.

Partition order: Blocks are partitioned by a Sharding Key. Messages within the same partition are published and consumed in a strict FIFO order.

Here’s an example from the document:

In order creation of e-commerce, order ID is taken as Sharding Key, so the creation order message, order payment message, order refund message and order logistics message related to the same order will be consumed in accordance with the order of release.

Sharding Key can be used to ensure that messages of the same type and users are sent and consumed in sequence, which not only ensures high concurrent processing of messages, but also ensures business continuity.


The partitioning order is shown in the figure above. In terms of code implementation, a custom Selector needs to be set, then the arG is parsed and MQ is selected based on a strategy, such as a common hash mode. The strategy can be chosen by referring to this class: SelectMessageQueueByHash

Message consumption pattern

In RocketMQ, two consumption patterns are implemented

  • PULL mode: ConsumerActive from the message serverBrokerGet the message.
  • PUSH mode:Message serverBrokerActively push messages toConsumer.

To quote the description of [Fujiwara Tofu Shop -] :

  • MQPullConsumer: The process of message retrieval needs to be written by users themselves. First, they get the collection of MessageQueue through the Topic they intend to consume, traverse the collection of MessageQueue, and then fetch messages in batches for each MessageQueue. After one collection, The next starting offset of the queue is recorded until the queue is finished, and then another MessageQueue.

  • MQPushConsumer: The consumer encapsulates the polling process, registers the MessageListener listener, and wakes up the MessageListener’s consumeMessage() to consume the message. To the user, the message feels like it is being pushed.

Look at the @RocketmqMessagelistener annotation in SpringBoot and the team’s RocketMQ secondary wrapper. MQPushConsumer is used to encapsulate the pull polling process. RocketMQ uses the Pull mode of consumption.

The message to repeat

There are three semantics for messages:

  • At most once
  • At least once
  • Exactly once

Due to network fluctuation, the sender considers that the message fails to be sent for the first time and tries to send the message again. Therefore, the problem we need to solve can be understood as: How to ensure the correct processing of two identical messages?

There are two common approaches:

1. Keep messages idempotent

The messaging system
The consumer end

Idempotence: Use mathematical concepts to deepen understanding. For example, if there is a function f(x), x is a message, then the result of f(f(x)) is the same no matter how many times the message is repeatedly consumed. There will be no side effects due to repeated consumption to ensure the correctness of data.

Message deduplication: This is easier to understand, each Message has a globally unique Message ID that can be filtered at the Message system Broker or at the Consumer.

So far in the introduction, RocketMQ does not filter weight at the Broker, so it needs to be filtered at the consumer side. A new database table can be added to record the processed Message IDS. If duplicate messages are encountered, they will not be processed. The messages in process can be put into Redis first to avoid consuming the same messages at the same time

4. Elaborate on the core modules

As you can see from the previous architecture diagram, there are four core modules, and the process from producer sending messages to consumer consumption goes through the following:


From the perspective of the Producer, a long link is established with one OF the NS, and then a heartbeat maintenance state is periodically sent to obtain Topic Topic routing information. Then a long link is established with the Broker Master, and a heartbeat is periodically sent to determine whether it is available. Determine whether the return value of the Broker is required based on the type of message sent.

From the perspective of a Consumer, unlike Producer, it can subscribe to messages from the Broker Master as well as from the Broker Salve. There is no duplication of drawing.

Since RocketMQ is written in the pure Java language, you can download the source code at Github to see the detailed design of each module.

Nameserver

Nameserver manages message subscription, message sending, and consumption information. Each service in the cluster needs Nameserver to know its own status.

Similar to the Zookeeper registry in Dubbo, NameServer maintains the service status of Producer, Broker, and Consumer clusters. Periodically (default: 30s) sends heartbeat packets for maintenance and updates the status of each service.

Receive the Broker’s request and register its routing information

Interface Client requests to obtain routing information from a Topic to the Broker

NameServer has no state and can be scaled horizontally. Each Broker is registered with NameServer at startup. The Producer obtains routing information from Topic to NameServer before sending a message. The Consumer also periodically retrieves the Topic routing information.

With Namesrv in mind, you can see the startup process in the code:

org.apache.rocketmq.namesrv.NamesrvStartup#main

Copy the code

Broker

A Broker is positioned as a message Broker storage server that is responsible for persisting messages and managing the progress of message consumption.

To introduce its features:

① Keep long links and heartbeat with all Namesrv nodes, and register Topic information with Namesrv periodically (default 30s).

(2) In charge of message storage, support lightweight queues in terms of Topic. A single machine can support tens of thousands of queues and support the message push and pull model.

③ It has the ability to accumulate hundreds of millions of messages, and can strictly ensure the order of messages.

The specific startup code entry is:

org.apache.rocketmq.broker.BrokerStartup#main

Copy the code

Initialization process to do a bit of things, message storage, remote services, filters, etc., at a glance a little dizzy, did not continue to follow, interested in the source can go to see =-=

Producer

Message sender, the principal role that sends messages to message queues.

Here is the sequence diagram for the RocketMQTemplate call convertAndSend method:


The core steps are in the send implementation method of DefaultMQProducerImpl. The bottom layer selects the MessageQueue to be sent, executes the front hook, sends the request through NettyClinet, executes the post hook after sending, and finally returns SendResult.

You can introduce the RocketMQ-Starter dependency in SpringBoot, and then send a message to see the overall call link.

Consumer

  • Identification on the basis of

In a consumer group, multiple Listeners can be set up to consume messages on different topics.

During message consumption, the Listener needs to be uniquely identified by the ConsumerGroup + Topic + Tag.

Therefore, the Listener with the same Tag cannot appear in the same consumer group or topic. An error will be reported when the application is started.

  • Code implementation

The implementation code is in the same module as Producer: Client

Before we get to message consumption, let’s take a look at how when the application starts, it scans the RocketMQMessageListener annotated Beans and registers the container


According to my understanding, registered in the for loop scanning to bean, then assemble at createRocketMQListenerContainer DefaultRocketMQListenerContainer, and register in the Spring container, Consumption after waiting.

At the same time, a daemon thread is constantly pulling messages from the Broker, listening for messages that match the conditions and consuming them:


The process of registering a Listener and looping a message is described above. The code entry for Consumer startup is shown here:

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#start

Copy the code

The process for consumers to start registration is quite long and needs to be looked at slowly

V. Specific use (code and monitoring UI)

Download code (External and binary startup packages)

  • external: Expanded content in order to get the monitor consoleConsoleThe address is as follows:
https://github.com/apache/rocketmq-externals.git
Copy the code
  • install: Application startup package for deploymentNamesrvBroker
https://rocketmq.apache.org/dowloading/releases/
Copy the code
  • Program implementation code: Already open source, contributed toGithub, can download their own packaging and reference to achieve ideas, learning ~
https://github.com/apache/rocketmq
Copy the code

Start nameserver and Broker

Before starting, make sure that the JAVA_HOME variable is included in the local global variables, for example:

$ echo ${JAVA_HOME}

/ Library/Java/JavaVirtualMachines jdk1.8.0 _221. JDK/Contents/Home

Copy the code

Go to the downloaded Releases binary installation package directory

$ cdRocketmq - all - 4.7.0 - bin - release/bin

$sh paly.sh

Copy the code

Click on the play.sh script to see that it will start the Namesrv and Broker services. Once started, messages will be sent and consumed to the Broker

Springboot integration rocker – the starter

The Springboot integrated starter module is used here. For details, please refer to this article:

www.baeldung.com/apache-rock…

After personal integration, put it into the Rocket directory of Demo:

Github.com/Vip-Augus/s…

The Listener message Consumer processing process is described here. The application continuously gets the Topic messages listened to by the broker and then finds the corresponding Consumer to consume:


You can trace the invocation link to the left of the figure above to see the overall link of consumer consumption messages.

Start monitoring UI usage

* RocketMq-console * RocketMQ-Console * RocketMQ-Console * RocketMQ-Console * RocketMQ-Console * RocketMQ-Console

Port =10010 # Namesrv address, if there are more than one, please use a semi-olon; Separated # if this value is empty, the use value rocketmq env. Config. NamesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 rocketmq.config.namesrvAddr=localhost:9876Copy the code

Rocket-console is a Springboot project that needs to be packaged and deployed after modifying the configuration

$mvn clean install -DskipTests

$Java - jar target/rocketmq - the console - ng - 1.0.1. Jar

Copy the code

Then access the previously set port, you can see the monitoring platform:


The top navigation bar identifies the functions it has, such as which Broker a Topic is sent to and which consumer groups are subscribed to, which can be monitored:


6. Follow-up study plan

In daily use, the services of the Nameserver and Broker modules are basically not modified. Instead, they are used by Producer and Consumer, and they can be encapsulated twice. Replace DefaultMQProducer and MQConsumer implementation classes to create senders and consumers suitable for their own business.

The RocketMQ module is the core module of the RocketMQ module. The RocketMQ module is the core module of the RocketMQ module. The RocketMQTemplate module is the core module of the RocketMQ module. However, the detailed design of displacement, message storage format, synchronous and asynchronous flush mode and message repetition and so on has not been understood. If there is an opportunity, we will understand and share ~

The resources

1. Official documents of Aliyun products

Ali Cloud RocketMQ noun explanation

3. In-depth Understanding of RocketMQ – MQ message delivery mechanism

4. Message middleware Series (9) : Detailed description of RocketMQ architecture design, key features, and application scenarios

5. Shallow In, Shallow Out — RocketMQ

6. RocketMQ shares full documentation

7. Alibaba RocketMQ performance introduction

RocketMQ vs. Kafka (18 differences)

9. RocketMQ message consumption mode push and pull mode