Introduction to the
RocketMQ is alibaba’s open source messaging middleware, which has been donated to the Apache Foundation. It is developed by The Java language, featuring high throughput, high availability and suitable for large-scale distributed system applications. It has experienced the baptism of Double 11, and its strength cannot be ignored.
- Quick start:…
- Ali Cloud help document:…
This article is divided into three chapters, respectively from the concept of principle, cluster building, Java access combat, sequential message and transaction message to explain the content of this article refers to the official document (Alibaba, Apache), the original text is very simple white, this reference, do not repeat.
Basic concepts and advantages
Rocketmq is a message middleware based on the publish and subscribe queue model. It features high availability, high performance, high real-time, natural distribution, etc. (it has supported several Alibaba Double 11), and can ensure strict order of messages and hundreds of millions of message stacks. The design model is the same as traditional message middleware, as follows (excluding protocol)
Physical Deployment Structure
Structured as shown above, RocketMQ consists of four major parts (cluster mode): NameServer Cluster, Broker Cluster, Producer Cluster, Consumer Cluster; Each of them can scale horizontally without a single point of failure. As shown in the screenshot above
The NameServer NameServer provides lightweight service discovery and routing. Each name server records complete routing information, provides corresponding read and write services, and supports fast storage expansion
NameServer is a nearly stateless, fully functional server with two main functions: 1. Broker management. NameServer accepts registrations from broker clusters and provides heartbeat to detect their availability. 2. Routing Management Each Nameserver holds all routing information about broker clusters and queues to provide queries to clients. As we know, the rocketMQ client (producer/consumer) queries the queue routing information from Nameserver. There are four ways to get a client to the nameserver address: (1). Programmatically, producer.setNamesrvaddr (” IP :port “)(most useful)(2).java configuration items, using Rocketmq.namesrv.addr (3). Context variable NAMESRV_ADDR (4).HTTP endpoint where priority: Programs > Java Configuration > Environment Variables > HTTP endpoint Broker Broker handles message storage by providing a lightweight topic and queue mechanism. They support the Push and Pull model, including tolerance (2 copies or 3 copies), provides a strong ability to deal with the peak and stored in order of time millions of message storage capacity, in addition, the agency provides the disaster recovery, rich measurement statistics and alarm mechanism, these are lack in the traditional messaging system
The Broker Server is responsible for storing and delivering messages, querying messages, ensuring high availability, and more. As shown in the figure below, the Broker Server has some very important sub-modules: (1) Remoting modules, the gateway to the Broker, handle requests from the client. (2) Client Manager Manages individual clients (producers/consumers) and maintains consumer theme subscriptions. (3) Store, which provides a simple API to hold or query messages on disk. (4)HA high availability services provide data synchronization between master and slave brokers. (5) Index service provides quick query for message indexing. ProducerProduce supports distributed deployment, and distributed Produce sends messages to broker clusters through various load-balancing policies provided by them. The sending process supports fast failure and is low latency
Consumer also supports distributed deployment in push and pull mode, cluster consumption and message broadcast. Provide a real-time message subscription mechanism that meets the needs of most consumers.
Noun explanation
Message Topic, a first-level message type, classifies messages by Topic.
Message labels, secondary message types, are used to further differentiate the message categories under a Topic.
Message producers, also known as message publishers, are responsible for producing and sending messages.
Producer ID
The identifier of a class of producers that generally produce and send a class of messages that are logically consistent.
Producer instance
An object instance of Producer. Different Producer instances can run in different processes or on different machines. The Producer instance is thread-safe and can be shared between multiple threads in the same process.
Message consumers, also known as message subscribers, are responsible for receiving and consuming messages.
Consumer ID
The identity of a class of consumers that typically receive and consume a class of messages with consistent consumption logic.
Consumer instance
An object instance of Consumer. Different Consumer instances can run in different processes or on different machines. Configure thread pool consumption messages within a Consumer instance.
Cluster consumption
All consumers identified by a Consumer ID share consumption messages equally. For example, if a Topic has nine messages and a Consumer ID has three Consumer instances, then only three of the messages are consumed in a clustered consumption mode, with each instance split equally.
Radio consumption
All consumers identified by a Consumer ID consume a message once each. For example, if a Topic has nine messages and a Consumer ID has three Consumer instances, each instance will consume nine messages in broadcast consumption mode.
Timing of the message
The Producer sends a message to the MQ server, but does not expect the message to be delivered immediately. Instead, the message is delayed until some time after the current point in time to be delivered to a Consumer for consumption, which is a timed message.
Delay message
The Producer sends a message to the MQ server, but does not expect the message to be delivered immediately. Instead, the message is delivered to a Consumer after a certain amount of time. The message is called a delayed message.
Transaction message
MQ provides distributed transaction functionality similar to X/Open XA, where the ultimate consistency of distributed transactions can be achieved through MQ transaction messages.
The order message
MQ provides a type of message that is published and consumed sequentially, divided into globally ordered and partitioned ordered messages.
The order issued
For a given Topic, the client sends messages in a sequential order.
Order consumption
For a given Topic, messages are received in a certain order, that is, the first message sent is received by the client first.
Global order message
For a given Topic, all messages are published and consumed in a strict first-in, first-out (FIFO) order.
Partition order message
For a given Topic, all messages are partitioned according to a Sharding key. Messages within the same partition are published and consumed in a strict FIFO order. Sharding key is a key field used to distinguish different partitions in sequential messages, which is completely different from the key of ordinary messages.
Messages are stacked
The Producer has sent messages to the MQ server, but the Consumer fails to consume all the messages in a short time due to the limited consumption capacity of the Producer. At this time, the MQ server stores unconsumed messages, which is the state of message accumulation.
The message filter
Subscribers can filter messages based on message tags, ensuring that subscribers only receive filtered message types. Message filtering is done on the MQ server.
Message trajectory
In the process of sending a message from the publisher to the subscriber, it is a complete link information aggregated by the time and place of each related node. Through message traces, users can clearly locate the complete link where messages are sent from publishers to subscribers through the MQ server, facilitating fault locating and troubleshooting.
Reset consumption point
Reset the progress of message subscribers’ consumption of their subscription Topic within the time range of message persistence storage (default: 3 days) on the timeline, and the subscriber will receive messages sent from the message publisher to the MQ server after the specified point in time.
Message type
Ordinary message
A message without a feature, just a message, as distinguished from timed and delayed messages, sequential messages, and transaction messages with a feature. There are three ways to send a normal message:
- Reliable synchronous transmission
Principle: Synchronous sending refers to a communication mode in which the sender sends data and sends the next data packet after receiving the response from the receiver. Application scenario: This method is widely used in many scenarios, such as important notification email, SMS notification for registration, and marketing SMS system. 2. Principle of reliable asynchronous sending: Asynchronous sending means that the sender sends data and then sends the next data packet without waiting for the response from the receiver. Asynchronous sending of MQ requires the user to implement the asynchronous SendCallback interface (SendCallback). After sending a message, the sender can return to the server and send a second message without waiting for a response from the server. The sender receives the server response through the callback interface and processes the response results. Application scenario: Asynchronous sending is used in service scenarios that require long links and are sensitive to RT response time, for example, notifying users to start the transcoding service after video uploading and notifying users to push the transcoding result after the transcoding is complete. 3. Oneway Sending principle: In Oneway sending, the sender only sends messages without waiting for a response from the server and no callback function is triggered. That is, the sender only sends requests without waiting for a response. The process of sending messages in this way is very short, usually in the microsecond level. Application scenario: This mode is applicable to scenarios that require short time consuming but do not have high reliability requirements, such as log collection.
Timed messages and delayed messages
Timed messages: The Producer sends a message to the MQ server, but does not expect the message to be delivered immediately. Instead, the message is delayed to be delivered to a Consumer at a time after the current point in time. Delayed messages: The Producer sends a message to the MQ server, but does not expect the message to be delivered immediately. Instead, the message is delivered to a Consumer after a certain amount of time. This is a delayed message. Application Scenario Message production and consumption have time window requirements: For example, an MQ delay message will be sent when an order is created in an e-commerce transaction if the order is closed due to timeout. This message will be delivered to the consumer 30 minutes later, and the consumer needs to judge whether the corresponding order has been paid. If payment is not completed, the order is closed. If the payment has been completed, it is ignored. Messages trigger timed tasks, such as sending a reminder message to the user at a fixed point in time. Usage Mode The usage of timed messages and delayed messages is slightly different in code writing. To send a timed message, you need to specify a time point after the time when the message is sent as the message delivery time. When sending a delayed message, you need to set a delay time. The message will be delivered after a fixed time from the current sending point.
The order message
Sequential messages (FIFO messages) are a type of message provided by MQ that is published and consumed strictly sequentially. Sequential messaging means that messages are published and consumed in order.The order issued: For a given Topic, the client sends messages in a sequential order.Order consumption: For a specific Topic, messages are received in a certain order, that is, the first message is received by the client first.The global order: For a given Topic, all messages are published and consumed in a strict first-in, first-out (FIFO) order.The global sequential application scenario has low performance requirements and all messages are published and consumed strictly in ACCORDANCE with the FIFO principleZoning sequenceFor a given Topic, all messages are partitioned according to a Sharding key. Messages within the same partition are published and consumed in a strict FIFO order. Sharding key is a key field used to distinguish different partitions in sequential messages, which is completely different from the key of ordinary messages.
Application Scenario High performance requirements, sharding key is used as the partitioning field, and messages are distributed and consumed in the same block in strict accordance with the FIFO principle.
For user registration, a verification code needs to be sent. The user ID is used as the Sharding key. Then messages sent by the same user will be published and subscribed in sequence.
[Example 2] In order creation of e-commerce, order ID is taken as sharding key, so the order creation message, order payment message, order refund message and order logistics message related to the same order will be issued and subscribed in sequence.
The internal e-commerce systems of Alibaba Group all use the partition order message, which not only ensures the business order, but also ensures the high performance of business.
2.4 Transaction Message Common distributed transaction solutions include: final consistency, two-phase/three-phase commit, TCC, local message table, etc. Of these solutions, final consistency provides the best performance. Final consistency can be achieved through RocketMQ. RocketMQ transaction messages: Transaction messages: MQ provides distributed transaction functionality similar to X/Open XA, through which the ultimate consistency of distributed transactions can be achieved. Half message: A message that is temporarily undeliverable. The sender has successfully sent the message to the MQ server, but the server does not receive a second acknowledgement from the producer. In this case, the message is marked as undeliverable. Message check: The second confirmation of a transaction message is lost due to intermittent network disconnection or producer application restart. If the MQ server detects that a message has been half-message for a long time, it proactively asks the message producer for the final status of the message (Commit or Rollback). This process is called message check.
This article is very detailed: distributed open Messaging system (RocketMQ) principles and practices
A message model
Cluster consumption
Clustering: Subscribers using the same Consumer ID by the MQ convention belong to the same cluster. The consumption logic of the subscribers under the same cluster must be exactly the same (including the use of tags), and the subscribers can logically be considered a consumption node.
Cluster consumption When using the cluster consumption pattern, MQ assumes that any message needs to be processed by only one consumer in the cluster
Application Scenarios and precautionsIn a clustered consumer deployment, each message needs to be processed only once. Because the consumption progress is maintained on the server, the reliability is higher. In the cluster consumption mode, each message is sent to only one machine. In the cluster consumption mode, there is no guarantee that each failed message will be routed to the same machine. Therefore, no deterministic assumptions should be made when processing messages.
Radio consumption
When using broadcast consumption, MQ pushes each message to all registered clients in the cluster, ensuring that the message is consumed at least once by each machine. Application Scenarios and precautionsIn broadcast consumption modeSequential messages are not supported. Each message needs to be processed by multiple machines with the same logic. The consumption schedule is maintained on the client with a slightly higher probability of repetition than in cluster mode. In broadcast mode, MQ ensures that each message is consumed at least once by each client, but it does not fail to reinvest messages that fail to be consumed, so the business needs to pay attention to failed consumption. In broadcast mode, the client consumes the latest messages by default when it starts for the first time. The consumption progress of the client is persisted in a local hidden file on the client. Therefore, do not delete the hidden file; otherwise, some messages will be lost. In broadcast mode, each message is repeatedly processed by a large number of clients. Therefore, cluster mode is recommended when possible. Currently, only Java clients support broadcast mode. In broadcast mode, the server does not maintain the consumption schedule, so the MQ console does not support message heap queries and heap alerts.
Peer-to-peer (P2P)
Point-to-point (P2P), as its name implies, is a one-to-one messaging mode, that is, there is only one sender and one receiver of a message. While publish/subscribe (Pub/Sub) is typically used in one-to-many or many-to-many message group scenarios, with one or more message senders and multiple message receivers.
In a peer-to-peer (P2P) model, a sender sends a message knowing the information about the intended recipient of the message and knowing that the message needs to be consumed only by a specific single client. When sending a message, the sender specifies the recipient directly through the Topic information, and the recipient can get the message without subscribing in advance. The peer-to-peer mode saves recipients the cost of registering subscriptions, and the link for sending and receiving messages is individually optimized for lower push latency. The differences between P2P and PUB/SUB are as follows: When sending messages, pub/ SUB mode needs to send messages according to the Topic agreed with the receiver. However, in P2P mode, there is no need to specify the Topic for transmitting messages in advance, and the sender can directly send messages to the target receiving client according to the specifications. When receiving messages, Pub/Sub mode needs to subscribe in advance according to the Topic agreed with the sender to receive messages, while P2P mode does not need to subscribe in advance, which can simplify the program logic of the receiver and save the cost of subscription.
The message filter
Message filtering can be done on both the broker and the consumer side, and generally on the consumer side, because the broker is overloaded with messages that affect throughput. Filter to determine why such messages are generated and to determine the focus of topic and tag design. Topic: message Topic, by which different business messages are classified; Tag: Message tags, used to further classify messages under a Topic, MQ allows consumers to filter messages by Tag, ensuring that consumers end up consuming only the types of messages they care about. Topic is a level 1 tag and tag is a level 2 tag. Usage criteria: 1. Whether the message type is consistent, transaction message, sequential message, etc. 2. Business relevance: If there is no direct correlation, multiple topics should be applied, such as order, logistics and payment. For men’s and women’s orders, tag 3 can be used. Whether the message magnitude is equal: A message is trillions, while B message is lightweight but requires high real-time performance. Even if A and B meet the criteria 1,2, we should use different topics to avoid the “drag” of B message.
Brush set strategy
Let’s take a look at several conditions that affect message reliability:
1. The Broker shuts down normally. 2. The machine lost power, but the power supply was immediately restored. 5. The machine cannot be started (critical devices such as the CPU, mainboard, or memory module may be damaged) 6. The disk device is damaged. Hardware resources can be recovered immediately in 1,2,3, and 4 cases. Asynchronous replication ensures 99% message loss. Single point of failure in 5,6 cases
Asynchronous replication (Asynchronous flush)
When the cluster is configured with multiple masters and multiple slaves, the producer sends a message to the master, and the master immediately returns the message. Then, according to the preset policy, the slave pulls the message from the master and synchronizes the message. If the master fails, the slave will not be promoted to master, but can still subscribe
Synchronous double write
The producer sends a message to the master. The message is returned only when the master and slave write successfully. Performance is slightly lower than asynchronous, suitable for message-strict services, such as with Money
Installation and use
- Environmental requirements:
64bit OS, Linux/Unix/Mac is recommended; 64 – bit JDK 1.8 +; Maven 3.2 x; Git; 4g+ free disk for Broker server 2. Click here to download the 4.4.0 source release.
> unzip rocketmq - all - 4.4.0 - source - the zip >cdRocketmq-all-4.4.0 / > mvn-prelease -all-dskiptests clean install -u >cd distribution/target/apache-rocketmq
Copy the code
- Start the Name Server
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
Copy the code
Note: If the following error occurs during the startup process:
ERROR: Please set the JAVA_HOME variable inyour environment, We need Java(x64)! !!!!!Copy the code
Open the startup script and files and find the following three lines:
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
Copy the code
The default Java installation path is $HOME/ JDK/Java, where the second line is the Java directory on the internal server of Alibaba Group, if the above error is reported, this line will be commented out. Then change the value of JAVA_HOME in the first line to your machine’s Java installation directory. The MQNameserver and the MQbroker are then sent again, and the logs are observed to see that the startup is successful: especially for MAC OS
MAC OS reference :MAC OS Start
- Start the Broker
> nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s,] the boot success...Copy the code
Start the automatic creation topic
nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &
Copy the code
Send a consuming message
Shut down the Servers
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
Copy the code
RocketMQ plug-in
Rocketmq-console is an extension of the RocketMQ project. Rocketmq-console is a graphical management Console that provides functions such as viewing Broker cluster status, managing topics, displaying Producer and Consumer status, and querying messages. This feature requires a separate installation and operation once RocketMQ is installed.
- Enter the GitHub address of rocketMQ-Externals project, as shown below, you can see many extension projects of RocketMQ project, including the RocketMQ-Console that we need to download.
- Use git command to download the source code of the project. Since we only need RocketMQ-Console, we can download the corresponding branch of the project.
$ git clone- b release - rocketmq - the console - 1.0.0 the code
- Go to the project folder and modify the configuration file as required
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
# if you use rocketmq version < 3.5.8, rocketmq. Config. IsVIPChannel should be false. The default is true
#rocketmq-console's data path:dashboard/monitor
#set it false if you don't want use dashboard.default true
Copy the code
The address of Name Server is empty by default, and the note says that it can be configured in the background after starting the project. After testing, the switch of background configuration failed and an error was reported, so the address of Name Server should be specified in the configuration file before packaging. Or start the service given rocketmq. Config. NamesrvAddr parameter values. 4. Type the project into a JAR package and run the JAR file.
$ mvn clean package -Dmaven.test.skip=true$Java - jar target/rocketmq - the console - ng - 1.0.0. Jar# If Name Server is not specified in the configuration file$Java - jar target/rocketmq - the console - ng - 1.0.0. Jar -- rocketmq. Config. NamesrvAddr =''
Copy the code
- After the success of the start, visit the address http://localhost:8080/rocketmq, can enter the management background operation.
RocketMQ CLI Admin Tool
The CLI Admin Tool provides more refined management commands for RocketMQ cluster management. The command line method has higher requirements for operators. Of course, if you master the usage method, it will be much simpler and more efficient. The command line administration tool requires no additional installation and is already included under the ${RocketMQ_HOME}/bin folder.
As mentioned above, the command line management tool is included in the RocketMQ project. We go to the bin folder under the project and execute the command bash mqadmin:
The most commonly used mqadmin commands are:
updateTopic Update or create topic
deleteTopic Delete topic from broker and NameServer.
updateSubGroup Update or create subscription group
deleteSubGroup Delete subscription group from broker.
updateBrokerConfig Update broker's config updateTopicPerm Update topic perm topicRoute Examine topic route info topicStatus Examine topic Status info topicClusterList get cluster info for topic brokerStatus Fetch broker runtime status data queryMsgById Query Message by Id queryMsgByKey Query Message by Key queryMsgByUniqueKey Query Message by Unique key queryMsgByOffset Query Message by offset printMsg Print Message Detail printMsgByQueue Print Message Detail sendMsgStatus send msg to broker. brokerConsumeStats Fetch broker consume stats data producerConnection Query producer's socket connection and client version
consumerConnection Query consumer's socket connection, client version and subscription consumerProgress Query consumers's progress, speed
consumerStatus Query consumer's internal data structure cloneGroupOffset clone offset from other group. clusterList List all of clusters topicList Fetch all topic list from name server updateKvConfig Create or update KV config. deleteKvConfig Delete KV config. wipeWritePerm Wipe write perm of broker in all name server resetOffsetByTime Reset consumer offset by timestamp(without client restart). updateOrderConf Create or update or delete order conf cleanExpiredCQ Clean expired ConsumeQueue on broker. cleanUnusedTopic Clean unused topic on broker. startMonitoring Start Monitoring statsAll Topic and Consumer tps stats allocateMQ Allocate MQ checkMsgSendRT check message send response time clusterRT List All clusters Message Send RT getNamesrvConfig Get configs of name server. updateNamesrvConfig Update configs of name server. getBrokerConfig Get broker config by cluster or special broker! queryCq Query cq command. sendMessage Send a message consumeMessage Consume messageCopy the code
In the above list, the command name is on the left and the command meaning is on the right. You can see that most of the commonly used functions are included in the list. You can run the bash mqadmin help command to learn more details about how to use these commands. Run the bash mqadmin help updateTopic command to print the following information:
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg> create topic to which broker
-c,--clusterName <arg> create topic to which cluster
-h,--help Print help-n,--namesrvAddr <arg> Name server address list, eg:; - o - order < arg >set topic's order(true|false) -p,--perm
set topic'
s permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg> set read queue nums
-s,--hasUnitSub <arg> has unit sub (true|false)
-t,--topic <arg> topic name
-u,--unit <arg> is unit topic (true|false)
-w,--writeQueueNums <arg> set write queue nums
Copy the code
You can see the new TopicTest and some of the system’s default topics. If you want to learn about the source code implementation of these commands, click here.
- Xiajunhust. Making. IO / 2016/11/12 /…
Welcome to wechat public account “Code zonE”, focusing on sharing Java, cloud computing related content, including SpringBoot, SpringCloud, microservices, Docker, Kubernetes, Python and other related technology dry goods, looking forward to meeting you!