Technical work, should be praised and then see, form a habitCopy the code
RocketMQ use tutorial related series of directories
Section 1: Project preparation
Rocketmq – client dependency
< the groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.4.0 < / version >Copy the code
Section 2: Generic messages – Producer and message maker steps explained
Common message producer code implementation steps
- 1. Create message producers, and specify producer group names
- 2. Specify the Nameserver address
- 3. Start the producer
- 4. Create the Message object Message and specify the Topic, Tag, and Message body
- 5. Send the MESSAGE
- 6. Shut down producers
Common message consumer code implementation steps
- 1. Create a Consumer group and name the Consumer group
- 2. Specify the Nameserver address
- 3. Subscribe to topics and tags
- 4. Set the callback function to process the message
- 5. Start consumers
Note: The consumer Topic and Tag need to be aligned with the producer
Section 3: Generic messages – synchronous messages
Synchronous message The reliability of synchronous message sending is widely used, such as important message notification, short message notification.
producers
public class ProducerSync { public static void main(String[] args) throws Exception { // 1. Create message producer producer DefaultMQProducer = new DefaultMQProducer("demo_producer_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start the producer system.out.println (" producer start "); producer.start(); for (int i = 0; i < 3; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. */ Message MSG = new Message("Topic_demo_sync", "Tag_demo_sync", (" " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult = producer. Send (MSG); System.out.println(" sendResult: "+ sendResult); } // 6. Shutdown (); System.out.println(" producer closed "); }}Copy the code
Effect:
consumers
public class ConsumerSync { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / message number of pull the biggest consumer. SetConsumeMessageBatchMaxSize (2); // 3. Subscribe Topic and Tag consumer. Subscribe ("Topic_demo_sync", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt MSG: MSGS) {try {/ / to get theme String topic = MSG. GetTopic (); String tags = msg.gettags (); Byte [] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer info: topic:" + topic + ",tags:" + tags + ",result :" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); }}Copy the code
Effect:
Section 4: Plain messages – asynchronous messages
Asynchronous messages are typically used in response time sensitive business scenarios, where the sender cannot tolerate waiting for a Broker to respond for a long time.
producers
public class ProducerASync { public static void main(String[] args) throws Exception { // 1. Create message producer producer DefaultMQProducer = new DefaultMQProducer("demo_producer_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start the producer system.out.println (" producer start "); producer.start(); for (int i = 0; i < 3; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. */ Message MSG = new Message("Topic_demo_async", "Tag_demo_async", ("Hello ", "Hello ") " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // Send messages to a Broker asynchronously without returning values. We need to use SendCallback to receive the callback from producer.send(MSG, New SendCallback() {@override public void onSuccess(SendResult SendResult) {system.out.println (" sent successfully: "+ SendResult); } @override public void onException(Throwable) {system.out.println (" send exception: "+ throwable.getMessage()); }}); } Thread.sleep(2000); // 6. Shutdown (); System.out.println(" producer closed "); }}Copy the code
Effect:
Consumer:
public class ConsumerASync { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / message number of pull the biggest consumer. SetConsumeMessageBatchMaxSize (2); // 3. Subscribe Topic and Tag consumer. Subscribe ("Topic_demo_async", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt MSG: MSGS) {try {/ / to get theme String topic = MSG. GetTopic (); String tags = msg.gettags (); Byte [] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer info: topic:" + topic + ",tags:" + tags + ",result :" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); }}Copy the code
Effect:
Analysis:
As you can see from the consumer code, the asynchronous consumer implementation is no different from the synchronous consumer implementation.
From the consumer results: You can see that the asynchronous consumer receives out-of-order results.
Section 5: Plain messages – one-way messages
This approach is mainly used in scenarios where the results are not particularly important, such as log sending.
producers
public class Producer { public static void main(String[] args) throws Exception { // 1. Create message producer producer DefaultMQProducer = new DefaultMQProducer("demo_producer_group"); SetNamesrvAddr ("192.168.88.131:9876"); // 2. // 3. Start (); System.out.println(" producer start "); for (int i = 0; i < 20; I++) {// 4. Create a message object and specify the Topic, Tag, and body of the message. */ Message MSG = new Message("Topic_demo", "Tag_demo", ("Hello ", "Tag_demo") " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 5. Sending a unidirectional message producer.sendoneway (MSG); System.out.println(" send result: "+ MSG); // The thread sleeps for 1 second} // 6. System.out.println(" producer closed "); }}Copy the code
Effect:
consumers
public class Consumer { public static void main(String[] args) throws Exception { // 1. DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("demo_consumer_group"); // 2. Specify the Nameserver address consumer.setNamesrvaddr ("192.168.88.131:9876"); / / message number of pull the biggest consumer. SetConsumeMessageBatchMaxSize (2); // 3. Subscribe Topic and Tag consumer. Subscribe ("Topic_demo", "*"); // 4. Set the callback function, Handle the message consumer. RegisterMessageListener (new MessageListenerConcurrently () {/ / accept the message content @ Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt MSG: MSGS) {try {/ / to get theme String topic = MSG. GetTopic (); String tags = msg.gettags (); Byte [] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer info: topic:" + topic + ",tags:" + tags + ",result:" + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); Start consumer consumer.start(); }}Copy the code
Effect:
Analysis:
From the consumption results, the order of consumption is disorderly
Section 6: Summary
Synchronous, asynchronous, and one-way messages are implemented in the same way by consumers.
Synchronous, asynchronous, and one-way messages differ in the sender of the message.
The asynchronous message producer has no return value and needs to use SendCallback to receive the callback for the asynchronous return result.
Asynchronous message producer, advised to hibernate before closing the instance.
One-way messages also have no return value, and their consumers are unordered.
The difference between a one-way message and an asynchronous message is that a one-way message does not require SendCallback to receive an asynchronous callback that returns the result.