Chapter 4 RocketMQ applications
I. General information
1 Message sending classification
Producer also has a variety of options for sending messages, and different modes have different system effects.
Synchronous message sending
Synchronous sending means that after Producer sends a message, the Producer sends the next message only after receiving an ACK from MQ. This mode has the highest message reliability, but the message sending efficiency is low.
Sending messages asynchronously
Sending messages asynchronously means that the Producer directly sends the next message without waiting for an ACK from MQ. In this way, message reliability and message sending efficiency can be guaranteed.
One-way message sending
Sending messages one-way means that the Producer only sends messages and does not wait for or process ACK of MQ. MQ also does not return an ACK when sending this mode. This mode has the highest message sending efficiency, but the message reliability is poor.
2 Code Examples
Create a project
Create a Maven Java project rocketMq-test.
Import dependence
Import rocketMQ client dependencies.
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
Copy the code
Define asynchronous message sending producers
public class SyncProducer {
public static void main(String[] args) throws Exception {
// Create a producer with the name of the Producer Group
DefaultMQProducer producer = new DefaultMQProducer("pg");
// Specify the nameServer address
producer.setNamesrvAddr("rocketmqOS:9876");
// Set the number of retry times when sending fails. The default value is 2
producer.setRetryTimesWhenSendFailed( 3 );
// Set the sending timeout period to 5s, 3s by default
producer.setSendMsgTimeout( 5000 );
// Start the producer
producer.start();
// Produce and send 100 messages
for (int i = 0 ; i < 100 ; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("someTopic"."someTag", body);
// Specify a key for the message
msg.setKeys("key-" + i);
// Send a message
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
/ / close the producerproducer.shutdown(); }}Copy the code
// Message sending status
public enum SendStatus {
SEND_OK, // The message was sent successfully
FLUSH_DISK_TIMEOUT, // Flush timed out. This exception occurs only when the Broker sets the flush policy to synchronous flush. Asynchronous flush does not appear
FLUSH_SLAVE_TIMEOUT, // Slave Timed out. This exception occurs only when the master-slave replication mode of the Broker cluster is set to synchronous replication. Asynchronous replication does not occur
SLAVE_NOT_AVAILABLE, // No Slave is available. This exception occurs only when the Master-Slave replication mode of the Broker cluster is set to synchronous replication. Asynchronous replication does not occur
}
Copy the code
Define asynchronous message sending producers
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// Do not retry sending after asynchronous sending fails
producer.setRetryTimesWhenSendAsyncFailed( 0 );
// Set the number of queues for the newly created Topic to 2, default to 4
producer.setDefaultTopicQueueNums( 2 );
producer.start();
for (int i = 0 ; i < 100 ; i++) {
byte[] body = ("Hi," + i).getBytes();
try {
Message msg = new Message("myTopicA"."myTag", body);
// Send it asynchronously. Specify the callback
producer.send(msg, new SendCallback() {
// The callback method is triggered when producer receives an ACK from MQ
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) { e.printStackTrace(); }}); }catch(Exception e) { e.printStackTrace(); }}// end-for
// Sleep for a while
// If there is no sleep,
// If the message is not sent, the producer will be closed
TimeUnit.SECONDS.sleep( 3); producer.shutdown(); }}Copy the code
Defines a one-way message sending producer
public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); producer.start(); for (int i = 0 ; i < 10 ; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("single"."someTag", body); // send producer.sendoneway (MSG); } producer.shutdown(); System.out.println("producer shutdown"); }}
Copy the code
Defining message consumers
public class SomeConsumer {
public static void main(String[] args) throws MQClientException {
// Define a pull consumer
// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
// Define a push consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
/ / specify the nameServer
consumer.setNamesrvAddr("rocketmqOS:9876");
// Specify consuming from the first message
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// Specify the consumption topic and tag
consumer.subscribe("someTopic"."*");
// Specify broadcast mode for consumption. Default is cluster mode.
// consumer.setMessageModel(MessageModel.BROADCASTING);
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
// This method is triggered as soon as the broker has a message to which it subscribes,
// The return value is the current state of consumer consumption
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context)
{
// Consume messages one by one
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// Return consumption status: consumption succeeded
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start consumer spending
consumer.start();
System.out.println("Consumer Started"); }}Copy the code
Second, sequential messages
1 What are sequential messages
Sequential messages are messages that are consumed strictly in the order in which they are sent (FIFO).
By default, producers send messages to different partitioned queues in Round Robin mode. While consuming messages will pull messages from multiple queues, the order of sending and consuming is not guaranteed. If messages are sent only to the same Queue and are consumed only from that Queue, the order of the messages is strictly guaranteed.
2 Why are sequential messages needed
For example, you now have TOPIC ORDER_STATUS, which has four queues under it, and the different messages in this TOPIC are used to describe the different states of the current order. Assume that the order has status: unpaid, paid, shipping, shipping successful, shipping failed.
Based on the order status above, the producer can generate the following messages from the sequence:
Order T0000001: Unpaid –> Order T0000001: Paid –> Order T0000001: Shipping –> Order T0000001: shipping failed
After the message is sent to MQ, the selection of the Queue if a polling strategy is used, the message may be stored in MQ as follows:
In this case, we want the Consumer to consume the messages in the same order as we send them, but we can’t guarantee that the order is correct in the way MQ is delivered and consumed. For out-of-order messages, even if the Consumer is set up with some state tolerance, it can’t handle all of these random combinations.
Based on the above situation, a scheme can be designed as follows: for messages with the same order number, through certain policies, they are placed in a Queue, and then consumers adopt certain policies (for example, a thread independently processes a Queue to ensure the order of processing messages) to ensure the order of consumption.
3. Classification of order
Depending on the ordered scope, RocketMQ can strictly guarantee the ordering of two kinds of messages: partitioned and global.
The global order
The order guaranteed when there is only one sending and consuming Queue is the order of messages in the entire Topic, called global order.
Specify the number of queues when creating a Topic. There are three ways to specify:
1) When creating Producer in code, you can specify the number of queues that Producer automatically creates
2) Specify the number of queues when manually creating a Topic in the RocketMQ visual console
3) Specify the number of queues when creating a Topic manually using the mqadmin command
Partition and orderly
If more than one Queue participates, which can only guarantee the order of messages on the partitioned Queue, then calledPartition and orderly
.
How to implement Queue selection? When defining Producer, we can specify message queue selectors that we ourselves implement for the MessageQueueSelector interface.
When defining the selection algorithm for a selector, a selection key is usually used. The selection key can be a message key or other data. But whoever makes the choice key, it can’t be repeated, it’s unique.
A general selection algorithm modulates the selection key (or its hash value) with the number of queues that the Topic contains, resulting in the QueueId of the selected Queue.
There is a problem in the molding algorithm: the molding results of different selection keys and queues may be the same, that is, messages with different selection keys may appear in the same Queue, that is, the same Consuemr may consume messages with different selection keys. How can this problem be solved? The general approach is to get the selection key from the message and judge it. If the current Consumer needs to consume information, it consumes it directly; otherwise, it does nothing. This approach requires that the selection key be available to the Consumer along with the message. It is a good idea to use the message key as the key of choice.
Will the above approach lead to the following new problems? The message that does not belong to the Consumer is pulled, so can the Consumer that should consume the message consume it again? Messages in the same Queue cannot be consumed by different consumers in the same Group. Therefore, consumers that consume messages with different selection keys in a Queue must belong to different groups. However, consumption among consumers in different groups is isolated from each other and does not affect each other.
4 Code Examples
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for(int i=0; i<100; i++){
Integer orderId = i;
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TopicA"."TagA", body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
returnmqs.get(index); } }, orderId); System.out.println(sendResult); } producer.shutdown(); }}Copy the code
Delay message
1 What is a delayed message
Delayed messages are messages that can be consumed for a specified amount of time after they have been written to the Broker.
Delayed messages with RocketMQ enable timed tasks without the need for timers. Typical application scenarios are the scenario of closing orders without payment due to timeout in e-commerce transactions, and the scenario of cancelling tickets due to timeout without payment on 12306 platform.
In an e-commerce platform, a delayed message is sent when an order is created. The message will be delivered to the Consumer 30 minutes later, and the Consumer will determine whether the corresponding order has been paid. If not, cancel the order and put the item back in stock. If the payment is completed, it is ignored.
In 12306, a delayed message is sent when a ticket is booked. The message will be delivered to the Consumer 45 minutes later, and the Consumer will determine whether the corresponding order has been paid. If not, the reservation is cancelled and the ticket is returned to the ticket pool. If the payment is completed, it is ignored.
2 Delay Level
Delay duration of message delayArbitrary duration is not supported
Is specified by a specific delay level. Latency levels are defined on the RocketMQ serverMessageStoreConfig
Class in the following variables:
That is, if the delay level is 3, the delay duration is 10s. That is, the delay level starts from 1.
Of course, if you need a custom delay level, you can add the following configuration to the broker load (for example, 1 day level 1D). The configuration files are in the conf directory under the RocketMQ installation directory.
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
Copy the code
3 Implementation principle of delayed message
The concrete implementation scheme is as follows:
Modify the message
After the Producer sends a message to the Broker, the Broker first writes the message to a Commitlog file, which then needs to be distributed to the corresponding ConsumeQueue. However, before distributing the message, the system determines whether there is a delay level in the message. If not, it will be distributed directly as normal; If so, there is a complicated process:
-
Modify the Topic of the message to SCHEDULE_TOPIC_XXXX
-
According to the delay level, create the corresponding queueId directory and consumequeue file in the consumeQueue directory under the SCHEDULE_TOPIC_XXXX topic (if there are no such directories and files).
The mapping between delayLevel and queueId is queueId = delayLevel-1
Note that when creating a queueId directory, not all directories corresponding to all latency levels are created at one time
-
Modify message index unit content. The Message Tag HashCode part of the index unit originally stores the Hash value of the Message’s Tag. Now change to the delivery time of the message. Post time is the time when the message is rewritten to the original Topic and then written to the Commitlog again. Delivery time = message storage time + delay level time. Message store time refers to the timestamp when the message was sent to the Broker.
-
Writes the message index to the corresponding consumequeue under the SCHEDULE_TOPIC_XXXX topic
How are messages of each delay level Queue in the SCHEDULE_TOPIC_XXXX directory sorted?
Is sorted by message delivery time. All delayed messages of the same rank within a Broker are written to the same Queue in the consumeQueue directory SCHEDULE_TOPIC_XXXX. That is, the latency levels of message delivery times in a Queue are the same. The delivery time then depends on the message storage time. That is, messages are sorted by the time they were sent to the Broker.
Delivery delay message
The Broker has a delayed message service class, ScheuleMessageService, which consumes messages in SCHEDULE_TOPIC_XXXX and delivers delayed messages to the target Topic according to the delivery time of each message. However, before Posting, the original written message is read again from the Commitlog and its original delay level is set to 0, that is, the original message becomes an ordinary message without delay. The message is then delivered again to the target Topic.
When the Broker starts, ScheuleMessageService creates and starts a TImer to execute the corresponding scheduled task. The system defines a corresponding number of TimerTasks according to the number of delay levels. Each TimerTask is responsible for consuming and delivering messages of a delay level. Each TimerTask checks whether the first message in the corresponding Queue is due. If the first message is not due, all subsequent messages are not due (messages are sorted by delivery time). If the first message expires, it is delivered to the target Topic, that is, consumed.
Write the message back to the Commitlog
The Delayed message service class ScheuleMessageService sends the delayed message to the Commitlog again, and again forms a new message index entry and distributes it to the corresponding Queue.
This is just a normal message send. However, the message Producer is a delayed message service class ScheuleMessageService.
4 Code Examples
Define DelayProducer class
public class DelayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0 ; i < 10 ; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("TopicB"."someTag", body);
// Set the message latency level to 3, that is, 10 seconds
// msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
// Outputs the time when the message was sent
System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println(","+ sendResult); } producer.shutdown(); }}Copy the code
Define OtherConsumer class
public class OtherConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicB"."*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// Outputs the time when the message was consumed
System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
System.out.println("," + msg);
}
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.println("Consumer Started"); }}Copy the code