Abstract

The RabbitMQ core messaging patterns that have been written in the past are all based on the JavaAPI, but a recent look at the official documentation shows that these core messaging patterns can be implemented via Spring AMQP. RabbitMQ installation for Windows and Linux and the implementation of Spring AMQP for the five core message modes will be helpful for those who want to learn and review RabbitMQ.

Introduction to the

RabbitMQ is one of the most popular open source messaging middleware and is widely used around the world. RabbitMQ is lightweight and easy to deploy, supporting multiple messaging protocols. RabbitMQ can be deployed on distributed systems to meet the requirements of large scale and high availability.

Relevant concepts

Let’s take a look at some of the concepts in RabbitMQ, using the routing pattern as an example of one of the five message patterns.

 

 

Installation and configuration

Next we will introduce RabbitMQ installation and configuration, available in Windows and Linux.

Installation under Windows

  • Install Erlang, download address: erlang.org/download/ot…

 

  • Install RabbitMQ from dl.bintray.com/rabbit…

 

  • After the RabbitMQ installation is complete, go to the sbin directory in the RabbitMQ installation directory.

 

  • Enter CMD in the address bar and press Enter to start the command line, then enter the following command to start the management function.
rabbitmq-plugins enable rabbitmq_management
Copy the code

 

Installation under Linux

  • Download rabbitMQ 3.7.15 Docker image.
Docker pull the rabbitmq: 3.7.15Copy the code
  • Use the Docker command to start the service.
Docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \ -d rabbitmq:3.7.15Copy the code
  • Enter the container and enable the management function.
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_management
Copy the code

 

  • Enable the firewall to facilitate Internet access.
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
Copy the code

Access and Configuration

  • Access the RabbitMQ management page address and check whether the RabbitMQ installation is successful (in Linux, use the server IP address) : http://localhost:15672/

 

  • Enter the password and log in. Use the default password: guest guest
  • Create an account and set its role to administrator: Mall Mall

 

  • Create a new virtual host: /mall

 

  • Click the mall user to enter the user configuration page;

 

  • Assign the mall user the permission to the virtual host.

 

  • The RabbitMQ configuration is complete.

Five message patterns

These five message patterns are the basis for building rabbitMq-based messaging applications, so be sure to master them. Those of you who have learned RabbitMQ should be familiar with the Java implementation of these message patterns, which we implement in the form of Spring AMQP.

A simple model

The simple pattern is the simplest message pattern, consisting of a producer, a consumer, and a queue. The producer sends a message to the queue, and the consumer retrieves the message from the queue and consumes it.

Pattern diagram

 

Spring it realize

  • You first need to add Spring AMQP dependencies to pom.xml;
<! --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>Copy the code
  • Then modify application. Yml to add RabbitMQ configuration.
spring: rabbitmq: host: localhost port: 5672 virtual-host: /mall username: mall password: mall publisher-confirms: True # message sent to the exchange for confirmation publisher-RETURNS: True # Message sent to the queue for confirmationCopy the code
  • Add simple schema-related Java configuration to create a queue named simple.hello, a producer, and a consumer;
/** * Created by macro on 2020/5/19. */ @Configuration public class SimpleRabbitConfig { @Bean public Queue hello() { return new Queue("simple.hello"); } @Bean public SimpleSender simpleSender(){ return new SimpleSender(); } @Bean public SimpleReceiver simpleReceiver(){ return new SimpleReceiver(); }}Copy the code
  • The producer sends a message to queue simple. Hello using the send method.
/**
 * Created by macro on 2020/5/19.
 */
public class SimpleSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

    @Autowired
    private RabbitTemplate template;

    private static final String queueName="simple.hello";

    public void send() {
        String message = "Hello World!";
        this.template.convertAndSend(queueName, message);
        LOGGER.info(" [x] Sent '{}'", message);
    }

}
Copy the code
  • The consumer gets the message from queue simple.hello;
/** * Created by macro on 2020/5/19. */ @RabbitListener(queues = "simple.hello") public class SimpleReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class); @RabbitHandler public void receive(String in) { LOGGER.info(" [x] Received '{}'", in); }}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", @controller @requestMapping ("/rabbit") public class RabbitController {@autoWired private SimpleSender simpleSender; @apiOperation (" simple mode ") @requestMapping (value = "/simple", method = RequestMethod.GET) @ResponseBody public CommonResult simpleTest() { for(int i=0; i<10; i++){ simpleSender.send(); ThreadUtil.sleep(1000); } return CommonResult.success(null); }}Copy the code
  • When run, the result is as follows: producers send messages to the queue, and consumers get messages from the queue and consume them.

 

 

Working mode

Working mode is a mode that sends messages to multiple competing consumers, including one producer, two consumers, and one queue. Two consumers are bound to a queue at the same time, and idle consumers fetch and consume messages from the queue while consumers fetch time-consuming tasks for message processing.

Pattern diagram

 

Spring it realize

  • Add working mode related Java configuration to create a queue named work.hello, a producer, and two consumers;
/** * Created by macro on 2020/5/19. */ @Configuration public class WorkRabbitConfig { @Bean public Queue workQueue() { return new Queue("work.hello"); } @Bean public WorkReceiver workReceiver1() { return new WorkReceiver(1); } @Bean public WorkReceiver workReceiver2() { return new WorkReceiver(2); } @Bean public WorkSender workSender() { return new WorkSender(); }}Copy the code
  • The producer sends a message to queue work.hello using the send method. The message contains a number of Number;
/** * Created by macro on 2020/5/19. */ public class WorkSender { private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class); @Autowired private RabbitTemplate template; private static final String queueName = "work.hello"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3+1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index+1); String message = builder.toString(); template.convertAndSend(queueName, message); LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • Two consumers get messages from queue work.hello, named instance 1 and instance 2, containing messages. The more numbers, the longer the time;
/** * Created by macro on 2020/5/19. */ @RabbitListener(queues = "work.hello") public class WorkReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class); private final int instance; public WorkReceiver(int i) { this.instance = i; } @RabbitHandler public void receive(String in) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", this.instance, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds()); } private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", @controller @requestMapping ("/rabbit") public class RabbitController {@autoWired private WorkSender workSender; @apiOperation (" work") @requestMapping (value = "/work", method = RequestMethod.GET) @ResponseBody public CommonResult workTest() { for(int i=0; i<10; i++){ workSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); }}Copy the code
  • When run, the result is as follows, you can see that producers send different amounts of contains to the queue. Instance 1 and Instance 2 consumers compete with each other and consume part of the message.

 

 

 

Publish/subscribe

The publish/subscribe pattern is a pattern that sends messages to multiple consumers at the same time (in a broadcast-like form), consisting of one producer, two consumers, two queues, and one switch. Two consumers are bound to different queues, two queues are bound to the switch, producers send messages to the switch, and all consumers receive and consume messages.

Pattern diagram

 

Spring it realize

  • Add publish/subscribe Java configuration to create a switch named exchange.fanout, a producer, two consumers, and two anonymous queues, binding both anonymous queues to the switch;
/** * Created by macro on 2020/5/19. */ @Configuration public class FanoutRabbitConfig { @Bean public FanoutExchange fanout() { return new FanoutExchange("exchange.fanout"); } @Bean public Queue fanoutQueue1() { return new AnonymousQueue(); } @Bean public Queue fanoutQueue2() { return new AnonymousQueue(); } @Bean public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanout); } @Bean public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanout); } @Bean public FanoutReceiver fanoutReceiver() { return new FanoutReceiver(); } @Bean public FanoutSender fanoutSender() { return new FanoutSender(); }}Copy the code
  • The producer sends a message to the exchange. Fanout switch using the send method, which contains a number of Number;
/** * Created by macro on 2020/5/19. */ public class FanoutSender { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class); @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.fanout"; public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3 + 1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index + 1); String message = builder.toString(); template.convertAndSend(exchangeName, "", message); LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • The consumer gets the message from the bound anonymous queue containing the The more the number is, the longer the time is. Since the consumer can obtain and consume messages from two queues, it can be regarded as two consumers named instance 1 and instance 2 respectively.
/** * Created by macro on 2020/5/19. */ public class FanoutReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class); @RabbitListener(queues = "#{fanoutQueue1.name}") public void receive1(String in) { receive(in, 1); } @RabbitListener(queues = "#{fanoutQueue2.name}") public void receive2(String in) { receive(in, 2); } private void receive(String in, int receiver) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); } private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", @controller @requestMapping ("/rabbit") public class RabbitController {@autoWired private FanoutSender fanoutSender; @apiOperation (" publish/subscribe ") @requestMapping (value = "/fanout", method = RequestMethod.GET) @ResponseBody public CommonResult fanoutTest() { for(int i=0; i<10; i++){ fanoutSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); }}Copy the code
  • When run, the result is as follows, you can see that producers send different amounts of contains to the queue. Instance 1 and Instance 2 both get and consume the message.

 

 

 

Routing patterns

A routing pattern is a pattern that can selectively send messages to multiple consumers based on routing keys, including one producer, two consumers, two queues, and one switch. Two consumers are bound to different queues at the same time. The two queues are bound to the switch through the routing key. The producer sends messages to the switch, and the switch forwards messages to different queues through the routing key.

Pattern diagram

 

Spring it realize

  • Add the Routing mode related Java configuration to create a switch named exchange.direct, a producer, two consumers, and two anonymous queues. The queues are bound to the switch using routing keys. The routing keys of queue 1 are orange and black, and those of queue 2 are green and black.
/** * Created by macro on 2020/5/19. */ @Configuration public class DirectRabbitConfig { @Bean public DirectExchange direct() { return new DirectExchange("exchange.direct"); } @Bean public Queue directQueue1() { return new AnonymousQueue(); } @Bean public Queue directQueue2() { return new AnonymousQueue(); } @Bean public Binding directBinding1a(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("orange"); } @Bean public Binding directBinding1b(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("black"); } @Bean public Binding directBinding2a(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("green"); } @Bean public Binding directBinding2b(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("black"); } @Bean public DirectReceiver receiver() { return new DirectReceiver(); } @Bean public DirectSender directSender() { return new DirectSender(); }}Copy the code
  • The producer sends messages to exchange. Direct using different routing keys. The messages are forwarded to different queues based on routing keys.
/** * Created by macro on 2020/5/19. */ public class DirectSender { @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.direct"; private final String[] keys = {"orange", "black", "green"}; private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class); public void send(int index) { StringBuilder builder = new StringBuilder("Hello to "); int limitIndex = index % 3; String key = keys[limitIndex]; builder.append(key).append(' '); builder.append(index+1); String message = builder.toString(); template.convertAndSend(exchangeName, key, message); LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • The consumer gets the message from its bound anonymous queue. Since the consumer can get and consume messages from two queues, it can be regarded as two consumers named instance 1 and instance 2.
/** * Created by macro on 2020/5/19. */ public class DirectReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class); @RabbitListener(queues = "#{directQueue1.name}") public void receive1(String in){ receive(in, 1); } @RabbitListener(queues = "#{directQueue2.name}") public void receive2(String in){ receive(in, 2); } private void receive(String in, int receiver){ StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); } private void doWork(String in){ for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", @controller @requestMapping ("/rabbit") public class RabbitController {@autoWired private DirectSender directSender; @apiOperation (" routing mode ") @requestMapping (value = "/direct", method = RequestMethod.GET) @ResponseBody public CommonResult directTest() { for(int i=0; i<10; i++){ directSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); }}Copy the code
  • The result is as follows. The producer sends messages containing different routing keys to the queue. Instance 1 gets orange and black messages, and Instance 2 gets green and black messages.

 

 

 

Wildcard mode

The wildcard pattern is a pattern that can selectively send messages to multiple consumers based on routing key matching rules, including one producer, two consumers, two queues, and one switch. Two consumers are bound to different queues at the same time, and the two queues are bound to the switch by routing key matching rules. The producer sends messages to the switch, and the switch forwards messages to different queues by routing key matching rules. The consumers bound to the queues receive and consume messages.

Special match symbol

  • * : matches only one word;
  • # : Can match zero or more words.

Pattern diagram

 

Spring it realize

  • Add wildcard pattern-related Java configuration to create a switch named exchange.topic, a producer, two consumers, and two anonymous queues matching *.orange.* and *.*. Rabbit to queue 1 and lazy.
/** * Created by macro on 2020/5/19. */ @Configuration public class TopicRabbitConfig { @Bean public TopicExchange topic() { return new TopicExchange("exchange.topic"); } @Bean public Queue topicQueue1() { return new AnonymousQueue(); } @Bean public Queue topicQueue2() { return new AnonymousQueue(); } @Bean public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*"); } @Bean public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit"); } @Bean public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) { return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#"); } @Bean public TopicReceiver topicReceiver() { return new TopicReceiver(); } @Bean public TopicSender topicSender() { return new TopicSender(); }}Copy the code
  • A producer sends a message to a switch exchange. Topic using the send method, which contains different routing keys.
/** * Created by macro on 2020/5/19. */ public class TopicSender { @Autowired private RabbitTemplate template; private static final String exchangeName = "exchange.topic"; private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class); private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"}; public void send(int index) { StringBuilder builder = new StringBuilder("Hello to "); int limitIndex = index%keys.length; String key = keys[limitIndex]; builder.append(key).append(' '); builder.append(index+1); String message = builder.toString(); template.convertAndSend(exchangeName, key, message); LOGGER.info(" [x] Sent '{}'",message); System.out.println(" [x] Sent '" + message + "'"); }}Copy the code
  • The consumer gets the message from its bound anonymous queue. Since the consumer can get and consume messages from two queues, it can be regarded as two consumers named instance 1 and instance 2.
/** * Created by macro on 2020/5/19. */ public class TopicReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class); @RabbitListener(queues = "#{topicQueue1.name}") public void receive1(String in){ receive(in, 1); } @RabbitListener(queues = "#{topicQueue2.name}") public void receive2(String in){ receive(in, 2); } public void receive(String in, int receiver){ StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); } private void doWork(String in){ for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */ @Api(tags = "RabbitController", @controller @requestMapping ("/rabbit") public class RabbitController {@autoWired private TopicSender topicSender; @apiOperation (" wildcard ") @requestMapping (value = "/topic", method = RequestMethod.GET) @ResponseBody public CommonResult topicTest() { for(int i=0; i<10; i++){ topicSender.send(i); ThreadUtil.sleep(1000); } return CommonResult.success(null); }}Copy the code
  • The result is as follows. It can be found that the producer sends messages containing different routing keys to the queue, and instance 1 and Instance 2 get matched messages respectively.

 

 

 

The resources

RabbitMQ Tutorials:www.rabbitmq.com/gets…

Project source code address

github.com/macrozheng…