Technical work, should be praised and then see, form a habitCopy the code

RocketMQ use tutorial related series of directories


Section 1: Project preparation

Rocketmq – client dependency

< the groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.4.0 < / version >Copy the code

Section 2: Generic messages – Producer and message maker steps explained

Common message producer code implementation steps

  • 1. Create message producers, and specify producer group names
  • 2. Specify the Nameserver address
  • 3. Start the producer
  • 4. Create the Message object Message and specify the Topic, Tag, and Message body
  • 5. Send the MESSAGE
  • 6. Shut down producers

Common message consumer code implementation steps

  • 1. Create a Consumer group and name the Consumer group
  • 2. Specify the Nameserver address
  • 3. Subscribe to topics and tags
  • 4. Set the callback function to process the message
  • 5. Start consumers

Note: The consumer Topic and Tag need to be aligned with the producer

Section 3: Generic messages – synchronous messages

Synchronous message The reliability of synchronous message sending is widely used, such as important message notification, short message notification.

producers

public class ProducerSync { public static void main(String[] args) throws Exception { // 1. Create message producer producer DefaultMQProducer = new DefaultMQProducer("demo_producer_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start the producer system.out.println (" producer start "); producer.start(); for (int i = 0; i < 3; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. */ Message MSG = new Message("Topic_demo_sync", "Tag_demo_sync", (" " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult = producer. Send (MSG); System.out.println(" sendResult: "+ sendResult); } // 6. Shutdown (); System.out.println(" producer closed "); }}Copy the code

Effect:

consumers

public class ConsumerSync { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / message number of pull the biggest consumer. SetConsumeMessageBatchMaxSize (2); // 3. Subscribe Topic and Tag consumer. Subscribe ("Topic_demo_sync", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt MSG: MSGS) {try {/ / to get theme String topic = MSG. GetTopic (); String tags = msg.gettags (); Byte [] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer info: topic:" + topic + ",tags:" + tags + ",result :" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); }}Copy the code

Effect:

Section 4: Plain messages – asynchronous messages

Asynchronous messages are typically used in response time sensitive business scenarios, where the sender cannot tolerate waiting for a Broker to respond for a long time.

producers

public class ProducerASync { public static void main(String[] args) throws Exception { // 1. Create message producer producer DefaultMQProducer = new DefaultMQProducer("demo_producer_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start the producer system.out.println (" producer start "); producer.start(); for (int i = 0; i < 3; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. */ Message MSG = new Message("Topic_demo_async", "Tag_demo_async", ("Hello ", "Hello ") " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // Send messages to a Broker asynchronously without returning values. We need to use SendCallback to receive the callback from producer.send(MSG, New SendCallback() {@override public void onSuccess(SendResult SendResult) {system.out.println (" sent successfully: "+ SendResult); } @override public void onException(Throwable) {system.out.println (" send exception: "+ throwable.getMessage()); }}); } Thread.sleep(2000); // 6. Shutdown (); System.out.println(" producer closed "); }}Copy the code

Effect:

Consumer:

public class ConsumerASync { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / message number of pull the biggest consumer. SetConsumeMessageBatchMaxSize (2); // 3. Subscribe Topic and Tag consumer. Subscribe ("Topic_demo_async", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt MSG: MSGS) {try {/ / to get theme String topic = MSG. GetTopic (); String tags = msg.gettags (); Byte [] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer info: topic:" + topic + ",tags:" + tags + ",result :" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); }}Copy the code

Effect:

Analysis:

As you can see from the consumer code, the asynchronous consumer implementation is no different from the synchronous consumer implementation.

From the consumer results: You can see that the asynchronous consumer receives out-of-order results.

Section 5: Plain messages – one-way messages

This approach is mainly used in scenarios where the results are not particularly important, such as log sending.

producers

public class Producer { public static void main(String[] args) throws Exception { // 1. Create message producer producer DefaultMQProducer = new DefaultMQProducer("demo_producer_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start (); System.out.println(" producer start "); for (int i = 0; i < 20; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. */ Message MSG = new Message("Topic_demo", "Tag_demo", ("Hello ", "Tag_demo") " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 5. Sending a unidirectional message producer.sendoneway (MSG); System.out.println(" send result: "+ MSG); // The thread sleeps for 1 second} // 6. System.out.println(" producer closed "); }}Copy the code

Effect:

consumers

public class Consumer { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / message number of pull the biggest consumer. SetConsumeMessageBatchMaxSize (2); // 3. Subscribe Topic and Tag consumer. Subscribe ("Topic_demo", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt MSG: MSGS) {try {/ / to get theme String topic = MSG. GetTopic (); String tags = msg.gettags (); Byte [] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer info: topic:" + topic + ",tags:" + tags + ",result:" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); }}Copy the code

Effect:

Analysis:

From the consumption results, the order of consumption is disorderly

Section 6: Summary

Synchronous, asynchronous, and one-way messages are implemented in the same way by consumers.

Synchronous, asynchronous, and one-way messages differ in the sender of the message.

The asynchronous message producer has no return value and needs to use SendCallback to receive the callback for the asynchronous return result.

Asynchronous message producer, advised to hibernate before closing the instance.

One-way messages also have no return value, and their consumers are unordered.

The difference between a one-way message and an asynchronous message is that a one-way message does not require SendCallback to receive an asynchronous callback that returns the result.