Introduction to MQ
1.1 Why use MQ
Message queues are a first-in, first-out data structure
The application scenarios include the following three aspects
1) Apply decoupling
The higher the coupling of the system, the lower the fault tolerance. Take the e-commerce application as an example. After a user creates an order, if the inventory system, logistics system and payment system are coupled, and any one of the subsystems is faulty or temporarily unavailable due to upgrade or other reasons, the ordering operation will be abnormal and the user experience will be affected.
With message queue decoupling, the coupling of the system is improved. For example, when the logistics system fails, it takes several minutes to repair. During this time, the data to be processed by the logistics system is cached in the message queue and the user’s order operation is completed normally. When the logistics system replies, the order messages in the message queue can be processed, and the terminal system does not perceive the failure of the logistics system for several minutes.
2) Flow peaking
Application systems may be overwhelmed by a sudden surge in system request traffic. With message queues, large numbers of requests can be cached and processed over long periods of time, which can greatly improve system stability and user experience.
In general, in order to ensure the stability of the system, if the system load exceeds the threshold, it will prevent user requests, which will affect the user experience, rather than using message queues to cache requests and wait for the system to finish processing to notify the user of the completion of the order, which is not a good experience.
For economic purposes:
If the QPS of the service system is 1000 during normal hours, the traffic peak is 10000. It is not cost-effective to configure a high-performance server to cope with the peak traffic. In this case, you can use message queues to peak the peak traffic
3) Data distribution
Message queues allow data to flow more widely across multiple systems. The producer of the data does not need to care about who uses the data, but simply sends the data to the message queue, where the consumer gets the data directly from the message queue
1.2 The advantages and disadvantages of MQ
Advantages: decoupling, peaking, data distribution
Disadvantages include the following:
-
The system availability decreases
The more external dependencies introduced into the system, the worse the stability of the system. When MQ goes down, there is an impact on business.
How to ensure high availability of MQ?
-
System complexity increases
The addition of MQ has greatly increased the complexity of the system, which used to be synchronous remote calls between systems, and now asynchronous calls are made through MQ.
How do YOU ensure that messages are not consumed repeatedly? How do I handle message loss? What about sequentiality of message delivery?
-
Consistency problem
After processing services, system A sends message data to systems B, C, and D through MQ. If systems B and C process data successfully, system D fails to process data.
How to ensure the consistency of message data processing?
1.3 Comparison of various MQ products
Common MQ products include Kafka, ActiveMQ, RabbitMQ, and RocketMQ.
A quick start to RocketMQ
RocketMQ is the MQ middleware of Alibaba in 2016, which is developed with Java language. Within Ali, RocketMQ has undertaken the message flow of high concurrency scenarios such as “Double 11” and can process the messages of trillion level.
2.1 Preparations
2.1.1 download RocketMQ
RocketMQ version 4.6.0 is chosen here
Download address: Download address
The official document: rocketmq.apache.org/docs/quick-…
2.2.2 Environment Requirements
-
On a system
-
JDK1.8(64位)
2.2 installation RocketMQ
2.2.1 Installation Procedure
Here I install it as a binary package:
- Decompress the Installation Package
- Going to the installation directory
2.2.2 Introduction to Directories
bin
: Startup scripts, including shell scripts and CMD scriptsconf
: Instance profile, including broker profile, logback profile, etclib
: depends on JAR packages, including Netty, Commons-lang, and FastJSON
2.3 start RocketMQ
-
The default RocketMQ VIRTUAL machine has a large memory. Starting Broker or NameServer may fail due to insufficient memory, so you need to edit the following two configuration files to change the JVM memory size
#Edit runbroker. Sh and runServer. sh to change the default JVM size $ vi bin/runbroker.sh #The reference set JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" $ vi bin/runserver.sh #The reference set JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" Copy the code
-
Start the NameServer
#1. Start the NameServer nohup sh bin/mqnamesrv & #2. View startup logs tail -f ~/logs/rocketmqlogs/namesrv.log Copy the code
-
Start the Broker
#1. Start the Broker nohup sh bin/mqbroker -n localhost:9876 & #2. View startup logs tail -f ~/logs/rocketmqlogs/broker.log Copy the code
Some optional parameters for bin/ mqBroker:
-c
: Specifies the path of the configuration file-n
: Indicates the address of NameServer
2.4 test RocketMQ
2.4.1 Sending messages
#1. Set environment variables
export NAMESRV_ADDR=localhost:9876
#2. Use the Demo of the installation package to send messages
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code
2.4.2 Receiving messages
#1. Set environment variables
export NAMESRV_ADDR=localhost:9876
#2. Receive the message
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code
2.5 close RocketMQ
#1. Close the NameServer
sh bin/mqshutdown namesrv
#2. Close the Broker
sh bin/mqshutdown broker
Copy the code
2.6 Roles
Producer
: The sender of the message; Example: senderConsumer
: Message receiver; Example: RecipientConsumer Group
: Consumer group; Each consumer instance belongs to a consumer group, and each message is consumed by only one consumer instance in the same consumer group. (Different consumer groups can consume the same message simultaneously)Broker
: Staging and transmitting messages; Example: express delivery companiesNameServer
: Manages Broker; Example: the management of express delivery companiesTopic
: Distinguish the types of messages; A sender can send messages to one or more topics; The recipient of a message can subscribe to one or more Topic messagesMessage Queue
: is a Topic partition; Used to send and receive messages in parallel
2.7 Detailed description of broker Configuration Files
The default broker configuration file location is: conf/broker
# Name of the cluster
brokerClusterName=rocketmq-cluster
Broker name. Note that the broker name is different for different configuration files
brokerName=broker-a
#0 indicates Master, >0 indicates Slave
brokerId=0
#nameServer address, semicolon split
namesrvAddr=rocketmq-nameserver1:9876; rocketmq-nameserver2:9876
# When sending a message, automatically create a topic that does not exist on the server, the default number of queues created
defaultTopicQueueNums=4
Whether to allow the Broker to create Topic automatically. It is recommended to enable it offline and disable it online
autoCreateTopicEnable=true
Whether to allow brokers to automatically create subscription groups. It is recommended to enable offline and disable online
autoCreateSubscriptionGroup=true
#Broker listening port for external services
listenPort=10911
Delete file time point, default 4:00 a.m
deleteWhen=04
The default file retention time is 48 hours
fileReservedTime=120
CommitLog default size of each file is 1G
mapedFileSizeCommitLog=1073741824
ConsumeQueue Saves 30W entries per file by default, which can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# Check the physical file disk space
diskMaxUsedSpaceRatio=88
# Storage path
storePathRootDir=/usr/local/rocketmq/store
#commitLog Storage path
storePathCommitLog=/usr/local/rocketmq/store/commitlog
Consumption queue storage path Storage path
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# Message index storage path
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint File storage path
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# Abort File storage path
abortFile=/usr/local/rocketmq/store/abort
# limit message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronous dual-write Master
#- SLAVE
brokerRole=SYNC_MASTER
# Brushing mode
# -async_flush Flush the disk asynchronously
# -sync_flush Flush disk synchronously
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# Number of thread pools sending messages
#sendMessageThreadPoolNums=128
# Number of pull message thread pools
#pullMessageThreadPoolNums=128
Copy the code
2.8 Building a visual monitoring platform
2.8.1 overview
RocketMQ has an open source extension to the incubator- RocketmQ-Externals. There is a submodule in this project called RocketmQ-Console. This is the management console project. Pull the incubator- RocketmQ-externals local first, because we need to compile and package rocketmQ-Console ourselves.
2.8.2 Download and compile the package
- Cloning project
git clone https://github.com/apache/rocketmq-externals
Copy the code
- in
rocketmq-console
In the configurationnamesrv
Cluster address:
$ cdRocketmq - the console $vim SRC/main/resources/application properties rocketmq. Config. NamesrvAddr = 10.211.55.4:9876Copy the code
-
The configuration is compiled and packaged
mvn clean package -Dmaven.test.skip=true Copy the code
-
Start the rocketmq – the console:
Nohup Java -jar rocketmq-console-ng-2.0.0.jar > tmp.log &Copy the code
After successful startup, we can access http://IP address :8080 through the browser to enter the console interface, as shown below:
3. Message Sending and Consumption Examples (Maven)
-
Import MQ client dependencies
== Note == : RocketMQ-client must be the same as RocketMQ
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.6.0</version> </dependency> Copy the code
-
Message sender step analysis:
- Creating a message producer
producer
And specify the producer group name - The specified
Nameserver
address - Start the
producer
- Creates the message object, specifying the topic
Topic
,Tag
And the message body - Send a message
- Shut down the producer
producer
- Creating a message producer
-
Analysis of message consumer steps:
- Create a consumer
Consumer
, formulate the consumer group name - The specified
Nameserver
address - Subscribe to the topic
Topic
和Tag
- Set the callback function to process the message
- Start consumer
consumer
- Create a consumer
3.1 Basic Examples
3.1.1 Sending messages
1) Send the synchronization message
This reliable synchronous sending method is widely used, such as: important message notification, SMS notification.
public class SyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate the message Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Set the NameServer address
producer.setNamesrvAddr("localhost:9876");
// Set the number of retries when a message fails to be sent synchronously. The default is 2
producer.setRetryTimesWhenSendFailed(2);
// set the timeout period for sending messages, which is 3000ms by default
producer.setSendMsgTimeout(3000);
// Start the Producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create the message and specify the Topic, Tag, and message body
Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send a message to a Broker
SendResult sendResult = producer.send(msg);
// Returns whether the message arrived successfully via sendResult
System.out.printf("%s%n", sendResult);
}
// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code
2) Send asynchronous messages
Asynchronous messages are often used in response time sensitive business scenarios where the sender cannot tolerate long waits for a response from the Broker.
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate the message Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Set the NameServer address
producer.setNamesrvAddr("localhost:9876");
// Set the number of retries when asynchronous message sending fails. The default value is 2
producer.setRetryTimesWhenSendAsyncFailed(2);
// set the timeout period for sending messages, which is 3000ms by default
producer.setSendMsgTimeout(3000);
// Start the Producer instance
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// Create the message and specify the Topic, Tag, and message body
Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback Receives the result callback asynchronously
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); }}); }// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code
3) Sending messages in one direction
This approach is mainly used in scenarios where the result of sending is not particularly concerned, such as log sending.
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// Instantiate the message Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Set the NameServer address
producer.setNamesrvAddr("localhost:9876");
// Start the Producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create the message and specify the Topic, Tag, and message body
Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send a one-way message with no results returned
producer.sendOneway(msg);
}
// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code
3.1.2 Consuming messages
1) Cluster mode (load balancing)
Consumers consume messages in cluster mode. == Only one consumer in the same consumer group will consume == for a message
public static void main(String[] args) throws Exception {
// Instantiate the message producer and specify the group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// Specify the Namesrv address.
consumer.setNamesrvAddr("localhost:9876");
/ / subscribe to the Topic
consumer.subscribe("Test"."*");
// Load balancing mode consumption
consumer.setMessageModel(MessageModel.CLUSTERING);
// Register the callback function to process the message
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context)
{
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
consumer.start();
System.out.printf("Consumer Started.%n");
}
Copy the code
2) Broadcast mode
Consumers consume messages by broadcasting, == a message is consumed by every consumer in the same consumer group ==
public static void main(String[] args) throws Exception {
// Instantiate the message producer and specify the group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// Specify the Namesrv address.
consumer.setNamesrvAddr("localhost:9876");
/ / subscribe to the Topic
consumer.subscribe("Test"."*");
// Broadcast mode consumption
consumer.setMessageModel(MessageModel.BROADCASTING);
// Register the callback function to process the message
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context)
{
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
consumer.start();
System.out.printf("Consumer Started.%n");
}
Copy the code
3.2 Sequential Messages
Message order means that messages can be consumed in the order in which they were sent (FIFO). RocketMQ can strictly ensure that messages are ordered, which can be divided into partition order or global order.
By default, messages are sent in Round Robin mode to different queues (partitioned queues). When consuming a message, it is pulled from multiple queues, in which case there is no guarantee of the order in which the message is sent and consumed. But if the sequential messages that control sending are only sent to the same queue, and are only pulled from that queue when consumed, order is guaranteed. If there is only one queue for sending and consuming, it is globally ordered. If more than one queue is involved, the partition is ordered, that is, messages are ordered relative to each queue.
Here is an example of partition ordering with an order. The sequential process of an order is: create, pay, push, complete. Messages with the same order number will be sent to the same queue successively. When consumed, the messages with the same OrderId must be obtained from the same queue.
3.2.1 Sequential message production
/** * Send sequential messages */
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA"."TagC"."TagD"};
// Order list
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// Add a time prefix
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; // Select a queue based on the order ID
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());/ / order id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/** * Order steps */
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId(a) {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc(a) {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString(a) {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\' ' +
'} '; }}/** * generate simulated order data */
private List<OrderStep> buildOrders(a) {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("Create");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("Create");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("Payment");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("Create");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("Payment");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("Payment");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("Complete");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("Push");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("Complete");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("Complete");
orderList.add(orderDemo);
returnorderList; }}Copy the code
3.2.2 Sequential consumption of messages
/** * Sequential message consumption, with transaction mode (the application can control when Offset is committed) */
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/** * set whether the Consumer starts to consume at the head of the queue or at the end of the queue for the first time
* If not for the first time, then continue to consume at the same position as the last time */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest"."TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// We can see that each queue has a unique consume thread to consume, and the order is ordered for each queue(partition)
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
// Simulate business logic processing...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("Consumer Started."); }}Copy the code
3.3 Delayed Message
For example, in e-commerce, you can send a delayed message after submitting an order, check the status of the order one hour later, and cancel the order to release the inventory if there is still no payment.
3.3.1 Starting the message Consumer
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate the consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
/ / subscribe switchable viewer
consumer.subscribe("TestTopic"."*");
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "]" + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumerconsumer.start(); }}Copy the code
3.3.2 Sending delayed Messages
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to generate delayed messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Start the producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// Set latency level 3. This message will be sent after 10s. (Now only fixed times are supported, see delayTimeLevel)
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shut down the producerproducer.shutdown(); }}Copy the code
# # # 4.3.3 validation
You will see that the message is consumed 10 seconds later than it is stored
3.3.4 Restrictions
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code
Right now, RocketMq does not support any time delay. You need to set several fixed delay levels, 1s through 2h for levels 1 through 18
3.4 Batch Messages
Batching messages can significantly improve the performance of delivering small messages. The limitations are that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. In addition, the total size of this batch of messages should not exceed 4MB.
3.4.1 Sending Batch Messages
If you only send messages of less than 4MB at a time, it is easy to use batch processing, such as the following:
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
/ / handle the error
}
Copy the code
If the total length of the message is likely to be greater than 4MB, it is best to split the message
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext(a) {
return currIndex < messages.size();
}
@Override
public List<Message> next(a) {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // Adds 20 bytes to the log overhead
if (tmpSize > SIZE_LIMIT) {
// A single message exceeds the maximum limit
// Ignore, otherwise the split process will be blocked
if (nextIndex - currIndex == 0) {
// If the next sublist has no elements, add the sublist and exit the loop, otherwise just exit the loop
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
returnsubList; }}// Split a large message into smaller messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
/ / handle the error}}Copy the code
3.5 Filtering Messages
In most cases, the TAG is a simple and useful design for selecting the messages you want. Such as:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
Copy the code
The consumer will receive a message containing either TAGA or TAGB or TAGC. But the limitation is that a message can only have one tag, which may not work for complex scenarios. In this case, you can use SQL expressions to filter the messages. SQL features can be computed from the properties at the time the message is sent. Some simple logic can be implemented under the syntax defined by RocketMQ. Here’s an example:
------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------Copy the code
3.5.1 Basic SQL syntax
RocketMQ defines only some basic syntax to support this feature. You can also easily extend it.
- Numerical comparison, such as:
>
.> =
.<
.< =
.BETWEEN
.=
- Character comparisons, such as:
=
.<>
.IN
IS NULL
orIS NOT NULL
- Logic symbol
AND
.OR
.NOT
The supported types of constants are:
- Numerical values, such as:
123
.3.1415
- Characters such as:
'abc'
Must be wrapped in single quotation marks NULL
, special constants- Boolean value,
TRUE
或FALSE
Only consumers using push mode can use SQL statements using SQL92 standard. The interface is as follows:
public void subscribe(finalString topic, final MessageSelector messageSelector)
Copy the code
3.5.2 Message Producer
When you send a message, you can set the properties of the message with putUserProperty
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
Copy the code
3.5.3 Message Consumer
Use MessagesElector.bysQL to filter messages using SQL
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
A, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code
3.6 Transaction Message
3.6.1 Process Analysis
The figure above illustrates the general scheme of the transaction message, which is divided into two processes: the sending and committing of the normal transaction message, and the compensation process of the transaction message.
####1) Sending and submitting transaction messages
(1) Sending half messages.
(2) Write result of the server response message.
(3) Execute the local transaction according to the sending result (if the writing fails, the half message is not visible to the service and the local logic is not executed).
(4) Commit or Rollback based on the local transaction state (the Commit operation generates the message index and makes the message visible to the consumer)
2) Transaction compensation
(1) Initiate a “callback” from the server for the pending transaction messages without Commit/Rollback
(2) After receiving the backcheck message, the Producer checks the status of local transactions corresponding to the backcheck message
(3) Commit or Rollback based on the local transaction status
The compensation phase is used to resolve situations where the Commit or Rollback messages time out or fail.
3) Status of the transaction message
A transaction message has three states: commit state, rollback state, and intermediate state:
- TransactionStatus.Com mitTransaction: submit a transaction, it allows consumer spending this message.
- TransactionStatus. RollbackTransaction: roll back a transaction, it represents the message will be deleted, is not permitted to consume.
- TransactionStatus. Unknown: intermediate state, it means we need to check the message queue to determine the state.
3.6.2 Sending a Transaction Message
1) Create transactional producers
By creating a producer using the TransactionMQProducer class and specifying a unique ProducerGroup, you can set up a custom thread pool to handle these check requests. After the local transaction is executed, the message queue needs to be replied based on the execution result. Refer to the previous section for the returned transaction state.
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// Create transaction listener
TransactionListener transactionListener = new TransactionListenerImpl();
// Create a message producer
TransactionMQProducer producer = new TransactionMQProducer("group6");
producer.setNamesrvAddr("192.168.25.135:9876; 192.168.25.138:9876");
// The producer is the listener
producer.setTransactionListener(transactionListener);
// Start the message producer
producer.start();
String[] tags = new String[]{"TagA"."TagB"."TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
TimeUnit.SECONDS.sleep(1);
} catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}//producer.shutdown();}}Copy the code
2) Implement the transaction listening interface
When sending the semi-message is successful, we use the executeLocalTransaction method to execute the local transaction. It returns one of the three transaction states mentioned in the previous section. The checkLocalTranscation method is used to check the status of a local transaction and to respond to a request to check the message queue. It also returns one of the three transaction states mentioned in the previous section.
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("Perform local transactions");
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
returnLocalTransactionState.UNKNOW; }}@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("MQ check message Tag ["+msg.getTags()+Local transaction execution result of "]");
returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code
3.6.3 Restrictions
- Delayed and batch messages are not supported for transactional messages.
- To prevent a single message from being checked too many times and causing a semi-queue of messages to accumulate, we limit the number of checks for a single message to 15 by default, but users can use the Broker profile
transactionCheckMax
Parameter to modify this restriction. If a message has been checked more than N times (N =transactionCheckMax
) the Broker dismisses the message and, by default, prints an error log. Users can overrideAbstractTransactionCheckListener
Class to modify the behavior. - Transaction messages are checked after a specific length of time such as transactionMsgTimeout in the Broker profile parameter. Users can also set user properties when sending a transaction message
CHECK_IMMUNITY_TIME_IN_SECONDS
To change this limitation, this parameter takes precedence overtransactionMsgTimeout
Parameters. - Transactional messages may be examined or consumed more than once.
- The target topic message submitted to the user may fail, currently depending on the logging. Its high availability passes
RocketMQ
If you want to ensure that transactional messages are not lost and that transactional integrity is guaranteed, it is recommended to use a synchronous dual write mechanism. - The producer ID of a transaction message cannot be shared with the producer ID of another type of message. Unlike other types of messages, transaction messages allow a reverse lookup, where the MQ server can query to the consumer by their producer ID.
3.7 Configure AK and Secret when connecting RocketMQ of Aliyun
If you are calling RocketMQ from Aliyun, you also need to specify AK and Secret. Ali Cloud Demo: poke here
3.7.1 producers
The AK and Secert operations of the Producer are the same. You only need to specify the AK and Secert when creating the Producer. In this example, sending a common message:
public class SyncAKProducer {
private static RPCHook getAclRPCHook(a) {
return new AclClientRPCHook(new SessionCredentials("Set your own ACCESS_KEY"."Set your own SECRET_KEY."));
}
public static void main(String[] args) throws Exception {
/** * Create a Producer and enable message tracks * If you do not want to enable message tracks, create the following: * DefaultMQProducer = new DefaultMQProducer(M" set my GroupName (unique) ", getAclRPCHook()); * /
DefaultMQProducer producer = new DefaultMQProducer(Set your own GroupName (unique), getAclRPCHook(), true.null);
/** * Set the access mode to Aliyun. This parameter needs to be set when using the message trace function on the cloud. If the message trace function is not enabled, this parameter is not set. */
producer.setAccessChannel(AccessChannel.CLOUD);
// Set the NameServer address
producer.setNamesrvAddr("localhost:9876");
// Set the number of retries when a message fails to be sent synchronously. The default is 2
producer.setRetryTimesWhenSendFailed(2);
// set the timeout period for sending messages, which is 3000ms by default
producer.setSendMsgTimeout(3000);
// Start the Producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create the message and specify the Topic, Tag, and message body
Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send a message to a Broker
SendResult sendResult = producer.send(msg);
// Returns whether the message arrived successfully via sendResult
System.out.printf("%s%n", sendResult);
}
// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code
3.7.2 consumers
AK and Secert consumers set operation are all the same, just need to specify when creating Consummer, here will receive regular news, for example:
private static RPCHook getAclRPCHook(a) {
return new AclClientRPCHook(new SessionCredentials("Set your own ACCESS_KEY"."Set your own SECRET_KEY."));
}
public static void main(String[] args) throws Exception {
/** * Create a Consumer and enable the message trace * If you do not want to enable the message trace, you can create it as follows: * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConfig.GROUP_ID, getAclRPCHook(), null); * /
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1", getAclRPCHook(), new AllocateMessageQueueAveragely(), true.null);
/** * Set the access mode to Aliyun. This parameter needs to be set when using the message trace function on the cloud. If the message trace function is not enabled, this parameter is not set. */
consumer.setAccessChannel(AccessChannel.CLOUD);
// Specify the Namesrv address.
consumer.setNamesrvAddr("localhost:9876");
/ / subscribe to the Topic
consumer.subscribe("Test"."*");
// Load balancing mode consumption
consumer.setMessageModel(MessageModel.CLUSTERING);
// Register the callback function to process the message
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context)
{
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
consumer.start();
System.out.printf("Consumer Started.%n");
}
Copy the code
4. Message Sending and Consumption Example (Spring Boot)
4.1 Importing Dependencies
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
Copy the code
4.2 producers
4.2.1 Application. Yaml configuration file
# application.yaml
rocketmq:
name-server: 10.124128.200.: 9876
producer:
group: test-group
# Number of retries when sending a synchronization message fails. Default is 2
retry-times-when-send-failed: 2
# Number of retries when sending an asynchronous message fails. Default is 2
retry-times-when-send-async-failed: 2
The default value is 3s
send-message-timeout: 3000
When connecting RocketMQ to Aliyun, you need to configure AK and SK
access-key:
secret-key:
Copy the code
4.2.2 producers
@RestController
@RequestMapping("/test")
public class ProducerTest {
// Automatic injection
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/sendSyncMessage")
public void sendSyncMessage(@RequestBody Map<String, Object> msgMap){
// Build the message
Message message = new Message("TopicName"."Tag", hash, JSON.toJSONBytes(msgData));
// Send a synchronization message
/ / method 1: use the same as the third chapter, the method of calling getProducer () method will return when DefaultMQProducer object, then call the method as the third chapter.
SendResult sendResult = rocketMQTemplate.getProducer().send(message);
// Method 2: message sending method encapsulated with rocketMQTemplate
// The first argument specifies Topic and Tag in the format 'topicName:tags'
// The second argument, the Message object
sendResult = rocketMQTemplate.syncSend("TopicName:Tag", message); }}Copy the code
4.2 consumers
4.2.1 Application. Yaml configuration file
rocketmq:
name-server: 10.124128.200.: 9876
# The following configuration can only be configured when RocketMQ is used in Ali Cloud
consumer:
access-key:
secret-key:
access-channel: CLOUD
Copy the code
4.2.2 Consumer message listener
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq", consumerGroup = "springboot-mq-consumer-1", selectorExpression = "*")
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("The Receive message:" + message);
// If the consumption fails, a RuntimeException is thrown and RocketMQ will automatically retry
// You can throw a RuntimeException either manually or using Lombok's @sneakyThrows annotation
throw new RuntimeException("Consumption failure"); }}Copy the code
Common configuration parameters for the @RocketmqMessagelistener annotation:
parameter | type | The default value | instructions |
---|---|---|---|
consumerGroup | String | Consumer groups | |
topic | String | Topic | |
selectorType | SelectorType | SelectorType.TAG | Select the message using either TAG or SQL92. The default is TAG |
selectorExpression | String | “*” | Controls which messages can be selected |
consumeMode | ConsumeMode | ConsumeMode.CONCURRENTLY | Consumption mode, concurrent or sequential receive, default concurrent mode |
messageModel | MessageModel | MessageModel.CLUSTERING | Consumption mode, broadcast mode or cluster mode, default cluster mode |
consumeThreadMax | int | 64 | Maximum number of consuming threads |
consumeTimeout | long | 15L | Consumption timeout (the maximum time (in minutes) a message can block a using thread) |
nameServer | String | ${rocketmq.name-server:} | NameServer address |
accessKey | String | ${rocketmq.consumer.access-key:} | AK |
secretKey | String | ${rocketmq.consumer.secret-key:} | SK |
accessChannel | String | ${rocketmq.access-channel:} | |
# 5. Message storage |
Because of the high reliability requirement of distributed queue, data should be stored persistently.
- The message generator sends the message
- MQ receives the message, persists the message, and adds a new record to the store
- Returns an ACK to the producer
- MQ push message to the corresponding consumer and wait for the consumer to return ACK
- If the message consumer returns ACK successfully within the specified time, MQ considers the message consumption to be successful and deletes the message from the store, that is, step 6 is executed. If MQ does not receive ACK within the specified time, it considers that message consumption fails and tries to push the message again, repeating steps 4, 5 and 6
- MQ Delete message
5.1 Storage Media
Several commonly used products in the industry, such as RocketMQ, Kafka, and RabbitMQ, use == message flushing == to the == file system == of the deployed VM or physical machine for persistence (generally, there are asynchronous and synchronous disk flushing modes).
Message flushing provides a high efficiency, high reliability and high performance data persistence method for message storage. Unless the MQ machine itself is deployed or the local disk is down, the failure to persist is usually not an issue.
5.2 Message storage structure
RocketMQ messages are stored by CommitLog and ConsumeQueue (CommitLog). The real physical storage file of messages is CommitLog. ConsumeQueue (CommitLog) is a logical queue of messages. Each Message Queue under each Topic has a corresponding ConsumeQueue file.
CommitLog
: Stores metadata of messagesConsumerQueue
: Stores messages inCommitLog
The index of theIndexFile
: provides a way through for message querieskey
Or time interval to query the message byIndexFile
To find the message without affecting the main process of sending and consuming the message
5.3 order to write
RocketMQ’s messages are written sequentially, ensuring the speed of message storage.
If the disk is used properly, the speed of the disk can match the data transfer speed of the network. The sequential write speed of current high-performance disks can reach 600MB/s, which exceeds the transmission speed of ordinary network adapters. But the speed of random disk write is only about 100KB/s, which is 6000 times better than sequential write performance! Because of this huge speed difference, a good message queuing system can be orders of magnitude faster than a normal message queuing system.
5.4 Disk Flushing Mechanism
RocketMQ’s messages are stored on disk, both to ensure recovery after a power failure and to allow the number of messages stored to exceed memory limits. To improve performance, RocketMQ ensures sequential disk writes as much as possible. When messages are written to RocketMQ through Producer, there are two types of disk writing: distributed synchronous disk flushing and asynchronous disk flushing.
1) Flush disks synchronously
When the write success status is returned, the message has been written to disk. The specific process is that the message is written to the PAGECACHE of the memory, immediately notify the flush thread flush disk, and then wait for the completion of the flush disk, flush the completion of the thread to wake up the waiting thread, the message is written back to the state of success.
2) Asynchronous disk flushing
When the write success status is returned, the message may just be written to the PAGECACHE of the memory. The return of the write operation is fast and the throughput is large. When the amount of messages in the memory accumulates to a certain extent, the disk is written quickly.
3) configuration
Both synchronous and asynchronous flush are set using the flushDiskType parameter in the Broker profile, which is configured to be either SYNC_FLUSH or ASYNC_FLUSH.
5.5 zero copy
The Linux operating system is divided into user mode and kernel mode. File operations and network operations need to switch between the two modes, which inevitably leads to data replication.
A server sends the contents of a local disk file to a client in two steps:
-
Read: reads the contents of local files.
-
Write: Sends the read content over the network.
These two seemingly simple operations actually replicated data four times, as follows:
- Copy data from disk to kernel mode memory;
- Copy from kernel mode memory to user mode memory;
- Then copy from user-mode memory to network-driven kernel-mode memory;
- Finally, it is copied from the kernel mode memory of the network driver to the network adapter for transmission.
By using MMAP, you can save the memory replication to the user mode and improve the speed. This mechanism is implemented in Java through MappedByteBuffer
RocketMQ takes advantage of these features, known as the “== zero copy ==” technology, to increase the speed at which messages are saved to the disk and sent over the network.
Note that MappedByteBuffer has several limitations. One of them is that no more than 1.5 files can be mapped to user mode virtual memory at a time. This is why RocketMQ sets a single CommitLog data file to 1G by default
Vi. High availability mechanism
The RocketMQ distributed cluster achieves high availability through the combination of Master and Slave.
The difference between Master and Slave: In the Broker configuration file, the brokerId parameter has a value of 0 to indicate that the Broker is Master, greater than 0 to indicate that the Broker is Slave, and the brokerRole parameter also indicates whether the Broker is Master or Slave.
The Master Broker can read and write messages. The Slave Broker can only read messages. That is, the Producer can only connect to the Master Broker and write messages. A Consumer can connect to either a Master Broker or a Slave Broker to read messages.
6.1 Message consumption High availability
In the Consumer configuration file, there is no need to set whether to read from the Master or Slave. When the Master is unavailable or busy, the Consumer is automatically switched to the Slave. With the automatic Consumer switching mechanism, if a Master machine fails, the Consumer can still read messages from the Slave without affecting the Consumer program. This is high availability on the consumer side.
6.2 Sending messages Is highly available
When creating a Topic, create multiple Message queues for a Topic on multiple Broker groups (machines with the same Broker name and different brokerId to form one Broker group), so that when the Master of one Broker group is unavailable, The masters of other groups are still available, and the producers can still send messages. RocketMQ does not currently support automatic conversion of Slave to Master. If you need to convert a Slave to Master, manually stop the Slave Broker, change the configuration file, and start the Broker with the new configuration file.
6.3 Primary/secondary Replication
If a Broker group has a Master and Slave, messages need to be replicated from the Master to the Slave, both synchronously and asynchronously.
1) Synchronous replication
In synchronous replication mode, the write success status is reported to the client after both Master and Slave are written successfully.
In synchronous replication mode, if the Master fails, the Slave has all backup data, which is easy to recover. However, synchronous replication increases the data write delay and reduces the system throughput.
2) Asynchronous replication
In asynchronous replication, the write success status is reported to the client as long as the Master is successfully written.
In asynchronous replication, the system has low latency and high throughput, but if the Master fails, some data may be lost because it is not written to the Slave.
3) configuration
Synchronous and asynchronous replication is set using the brokerRole parameter in the Broker configuration file, which has the following values:
ASYNC_MASTER
: replicates the primary node asynchronouslySYNC_MASTER
: synchronously replicates the primary nodeSLAVE
From the node:
4) summary
In actual applications, set the flush mode and the primary/secondary replication mode, especially the SYNC_FLUSH (synchronous flush) mode, based on service scenarios. As disk write actions are frequently triggered, performance is significantly reduced.
In general, it is a good choice to configure ASYNC_FLUSH for Master and Slave, and SYNC_MASTER for replication between Master and Slave. In this way, even if one machine fails, data will still be kept.
7. Load balancing
7.1 Producer Load Balancing
On the Producer side, when each instance sends a message, == by default, polls all message queues to send ==, so that messages are evenly placed on different queues. Since queues can be scattered across different brokers, messages are sent to different brokers, as shown below:
The labels on the arrow lines in the figure represent the order, with publishers sending the first message to Queue 0, then the second message to Queue 1, and so on.
7.2 Consumer Load Balancing
1) Cluster mode
In cluster consumption, each consumer group that subscribes to a topic receives a message, and each message is consumed by only one instance of a consumer group. RocketMQ uses an active pull to pull and consume messages, specifying which message queue to pull.
Every time the number of instances changes, load balancing is triggered, and the queue is evenly distributed to each instance according to the number of queues and the number of instances.
The default allocation algorithm is AllocateMessageQueueAveragely, the diagram below:
There’s another average algorithm is AllocateMessageQueueAveragelyByCircle, evenly between each queue, is just in the form of circular queue in turn points, the following figure:
In cluster mode, only one instance of a queue can be allocated. This is because if multiple instances of a queue consume messages at the same time, the same message will be consumed multiple times by different instances. So the algorithm is that a queue is assigned to only one consumer instance, and a consumer instance can be assigned to different queues at the same time.
By adding consumer instances to the queue, the consumption power of the queue can be expanded horizontally. When an instance goes offline, load balancing is triggered again, and the queue that was allocated to the queue will be allocated to another instance for consumption.
However, if the number of consumer instances is greater than the total number of message queues, the additional consumer instances will not be assigned to the queue and will not consume messages, thus not distributing the load. == So you need to control that the total number of queues is greater than or equal to the number of consumers. = =
2) Broadcast mode
Since broadcast mode requires that a message be delivered to all consumer instances under a consumer group, there is no such thing as a message being distributed.
One of the differences in implementation is that when consumers are assigned queues, all consumers are assigned to all queues.
Message retry
8.1 Sequential message retry
For sequential messages, when the consumer fails to consume the message, the message queue RocketMQ automatically retries the message repeatedly (at an interval of 1 second), at which point the application will be blocked from consuming the message. Therefore, when using sequential messages, it is important to ensure that the application can monitor and handle consumption failures in a timely manner to avoid blocking.
8.2 Unordered Message retry
For unordered messages (normal, scheduled, delayed, transactional), you can achieve message retry results by setting the return status when the consumer fails to consume the message.
The retry of unordered messages takes effect only for cluster consumption. The broadcast mode does not provide the failure retry feature. That is, after a failure is consumed, the failed message is not retried and new messages are consumed.
1) Retry times
Message queue RocketMQ allows a maximum of 16 retries per message by default, with the following interval for each retry:
The number of retries | The interval since the last retry | The number of retries | The interval since the last retry |
---|---|---|---|
1 | 10 seconds | 9 | 7 minutes |
2 | 30 seconds | 10 | Eight minutes |
3 | 1 minute | 11 | 9 minutes |
4 | 2 minutes | 12 | Ten minutes |
5 | 3 minutes | 13 | Twenty minutes |
6 | 4 minutes | 14 | 30 minutes |
7 | 5 minutes | 15 | 1 hour |
8 | 6 minutes | 16 | 2 hours |
If the message fails after 16 retries, the message will not be delivered. If a message fails to be consumed, 16 retries will be performed within the next 4 hours and 46 minutes. The message will not be delivered again after the retry period.
Note: No matter how many times a Message is retried, the Message ID of those retried messages does not change.
2) Configuration mode
If the consumption fails, configure the mode again
In cluster consumption mode, if message consumption fails, message retries are expected. You need to explicitly configure the message listener interface in any of the following ways:
- return
Action.ReconsumeLater
(recommended) - Returns Null
- An exception is thrown
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
// Process the message
doConsumeMessage(message);
// Method 1: return action.reconsumelater and the message will be retried
return Action.ReconsumeLater;
// Mode 2: returns NULL, and the message will be retried
return null;
// Mode 3: throw the exception directly, and the message will be retried
throw new RuntimeException("Consumer Message exceotion"); }}Copy the code
If the consumption fails, the configuration mode is not retried
In cluster consumption mode, if the message fails, the message is not expected to be retried. You need to catch the exception that may be thrown in the consumption logic and finally return Action.CommitMessage. After that, the message will not be retried.
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
// Catch all exceptions in the consumption logic and return Action.CommitMessage;
return Action.CommitMessage;
}
// Return Action.CommitMessage;
returnAction.CommitMessage; }}Copy the code
User-defined maximum number of message retries
Message queue RocketMQ allows a Consumer to set a maximum number of retries when started. The retry interval will be as follows:
- If the maximum number of retries is less than or equal to 16, the retry interval is the same as the preceding table.
- The maximum number of retries is greater than 16, and the retry interval is 2 hours.
Properties properties = new Properties();
// Set the maximum number of message retries for the corresponding Group ID to 20
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
Copy the code
Note:
- The maximum number of message retries is set for all Consumer instances with the same Group ID.
- If MaxReconsumeTimes is set for only one of two Consumer instances with the same Group ID, the configuration takes effect for both Consumer instances.
- The configuration takes effect in overwrite mode, that is, the last started Consumer instance overwrites the configuration of the previous started instance
Gets the number of message retries
After receiving the message, the consumer can obtain the retry times of the message as follows:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
// Get the number of retries for the message
System.out.println(message.getReconsumeTimes());
returnAction.CommitMessage; }}Copy the code
3) Retry for multiple consumer groups
Suppose there are consumer group A and consumer group B. When A and B are listening to the same topic, both A and B get the same message, but A fails to return action.reconsumelater, while B succeeds in consuming. When retrying, rocketMQ will only send the message to consumer group B, not to consumer group A.
Dead letter queues
When an initial consumption of a message fails, the message queue RocketMQ automatically retries the message. If the consumption fails after the maximum number of retries is reached, it indicates that the consumer was not able to consume the message correctly under normal circumstances. In this case, the message queue RocketMQ does not immediately discard the message, but instead sends it to a special queue for the consumer.
In Message queuing RocketMQ, such messages that can’t normally be consumed are called dead-letter messages, and the special queues that store dead-letter messages are called dead-letter queues.
9.1 Dead letter Feature
Dead-letter messages have the following features:
- It’s not going to be consumed by consumers.
- The validity period is the same as the normal message, which is 3 days. After 3 days, the message will be automatically deleted. Therefore, please process the dead letter message within 3 days after it is generated.
Dead letter queues have the following features:
- A dead letter queue corresponds to a Group ID, not to a single consumer instance.
- If a Group ID does not generate a dead letter message, message queue RocketMQ will not create a corresponding dead letter queue for it.
- A dead letter queue contains all the dead letter messages generated for the corresponding Group ID, regardless of the Topic to which the message belongs.
9.2 Viewing Dead Letter Information
-
Query the console for information about the topic where the dead letter queue appears
-
Query dead letter messages by subject on the message page
-
Choose to resend the message
When a message goes to a dead letter queue, it means that some factor prevents the consumer from consuming the message properly, so you usually need to do special processing on it. Once the suspect is identified and the problem resolved, the message can be resend on the RocketMQ console of the message queue, allowing the consumer to re-consume.
Consumption is idempotent
Message queue After the RocketMQ consumer receives the message, it is necessary to idempotent the message according to the unique Key on the business.
10.1 Necessity of idempotent consumption
In Internet applications, especially when the network is unstable, the message queue RocketMQ may be repeated, which can be summarized as follows:
-
The message was repeated when sent
When a message has been successfully sent to the server and persistence has been completed, the server fails to respond to the client due to intermittent network disconnection or client breakdown. If at this point the producer realizes that the Message failed and tries to send the Message again, the consumer will then receive two messages with the same content and the same Message ID.
-
Messages duplicate when delivered
In the message consumption scenario, the message has been delivered to the consumer and the service processing is complete. When the client sends a reply to the server, the network is intermittently disconnected. To ensure that the Message is consumed at least once, the RocketMQ server in the Message queue will try to deliver the previously processed Message again after the network is restored, and the consumer will then receive two messages with the same content and the same Message ID.
-
Message duplication during load balancing (including but not limited to network jitter, Broker restart, and subscriber application restart)
Rebalance is triggered when a RocketMQ Broker or client in the message queue restarts, expands or shrinks, and consumers may receive repeated messages.
10.2 Processing Methods
Because Message ids have the potential to conflict (duplicate), it is not recommended that Message ids be used as the basis for truly secure idempotent processing. The best approach is to use the business unique identity as the Key basis for idempotent processing, which can be set with the message Key:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
Copy the code
When a subscriber receives a message, it can idempotent according to the Key of the message:
consumer.subscribe("ons_test"."*".new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// Idempotent processing is performed according to the key uniquely identified by the service}});Copy the code
11. Precautions for RocketMQ
- With the same consumer group,The consumer logic should be the same(listening
topic
.tag
All the same) - By default, messages are shared between different consumer groups (all consumer groups get the same message), and consumers within a consumer group are load balanced (only one consumer gets the message).