SpringBoot e-Commerce project mall (35K + STAR) address: github.com/macrozheng/…

Abstract

The RabbitMQ core messaging patterns that have been written in the past are all based on the JavaAPI, but a recent look at the official documentation shows that these core messaging patterns can be implemented via Spring AMQP. RabbitMQ installation for Windows and Linux and the implementation of Spring AMQP for the five core message modes will be helpful for those who want to learn and review RabbitMQ.

Introduction to the

RabbitMQ is one of the most popular open source messaging middleware and is widely used around the world. RabbitMQ is lightweight and easy to deploy, supporting multiple messaging protocols. RabbitMQ can be deployed on distributed systems to meet the requirements of large scale and high availability.

Relevant concepts

Let’s take a look at some of the concepts in RabbitMQ, using the routing pattern as an example of one of the five message patterns.

mark Chinese name The English name describe
P producers Producer The sender of a message that can be sent to the switch
C consumers Consumer The receiver of the message, which gets the message from the queue and consumes it
X switches Exchange Receives a message sent by the producer and sends it to the specified queue according to the routing key
Q The queue Queue Stores messages sent from the switch
type Switch Type type Different types of switches forward messages in different ways
fanout Publish/subscribe fanout Broadcasts messages to all queues bound to the switch
direct Routing patterns direct Send messages based on routing keys
topic Wildcard mode topic The message is sent according to the matching rule of the routing key

Installation and configuration

Next we will introduce RabbitMQ installation and configuration, available in Windows and Linux.

Installation under Windows

  • Install Erlang at erlang.org/download/ot…

  • Install the RabbitMQ, download address: dl.bintray.com/rabbitmq/al…

  • After the RabbitMQ installation is complete, go to the sbin directory in the RabbitMQ installation directory.

  • Enter CMD in the address bar and press Enter to start the command line, then enter the following command to start the management function.
rabbitmq-plugins enable rabbitmq_management
Copy the code

Installation under Linux

  • downloadThe rabbitmq 3.7.15Docker image;
Docker pull the rabbitmq: 3.7.15Copy the code
  • Use the Docker command to start the service.
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq \
-dThe rabbitmq: 3.7.15Copy the code
  • Enter the container and enable the management function.
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_management
Copy the code

  • Enable the firewall to facilitate Internet access.
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
Copy the code

Access and Configuration

  • Access the RabbitMQ management page address and check whether the RabbitMQ installation is successful (in Linux, use the server IP address) : http://localhost:15672/

  • Enter the password and log in. Use the default password: guest guest

  • Create an account and set its role to administrator: Mall Mall

  • Create a new virtual host: /mall

  • Click the mall user to enter the user configuration page;

  • Assign the mall user the permission to the virtual host.

  • The RabbitMQ configuration is complete.

Five message patterns

These five message patterns are the basis for building rabbitMq-based messaging applications, so be sure to master them. Those of you who have learned RabbitMQ should be familiar with the Java implementation of these message patterns, which we implement in the form of Spring AMQP.

A simple model

The simple pattern is the simplest message pattern, consisting of a producer, a consumer, and a queue. The producer sends a message to the queue, and the consumer retrieves the message from the queue and consumes it.

Pattern diagram

Spring it realize

  • The first thing you need to do ispom.xmlAdd Spring AMQP dependencies to
<! --Spring AMQP dependencies -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
  • Then changeapplication.ymlAdd RabbitMQ configuration.
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /mall
    username: mall
    password: mall
    publisher-confirms: true The message is sent to the exchange for confirmation
    publisher-returns: true The message is sent to the queue for confirmation
Copy the code
  • addA simple modelRelated to the Java configuration, create a namedsimple.helloA queue, a producer, and a consumer;
/** * Created by macro on 2020/5/19. */
@Configuration
public class SimpleRabbitConfig {

	@Bean
	public Queue hello(a) {
		return new Queue("simple.hello");
	}

	@Bean
	public SimpleSender simpleSender(a){
		return new SimpleSender();
	}

	@Bean
	public SimpleReceiver simpleReceiver(a){
		return newSimpleReceiver(); }}Copy the code
  • Producer passThe send methodTo a queuesimple.helloTo send a message;
/** * Created by macro on 2020/5/19. */
public class SimpleSender {

	private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

	@Autowired
	private RabbitTemplate template;

	private static final String queueName="simple.hello";

	public void send(a) {
		String message = "Hello World!";
		this.template.convertAndSend(queueName, message);
		LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • Consumers from the queuesimple.helloTo get a message from;
/** * Created by macro on 2020/5/19. */
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

    @RabbitHandler
    public void receive(String in) {
        LOGGER.info(" [x] Received '{}'", in); }}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */
@Api(tags = "RabbitController", description = "RabbitMQ functional test")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private SimpleSender simpleSender;

    @ApiOperation("Simple mode")
    @RequestMapping(value = "/simple", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult simpleTest(a) {
        for(int i=0; i<10; i++){ simpleSender.send(); ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null); }}Copy the code
  • When run, the result is as follows: producers send messages to the queue, and consumers get messages from the queue and consume them.

Working mode

Working mode is a mode that sends messages to multiple competing consumers, including one producer, two consumers, and one queue. Two consumers are bound to a queue at the same time, and idle consumers fetch and consume messages from the queue while consumers fetch time-consuming tasks for message processing.

Pattern diagram

Spring it realize

  • addWorking modeRelated to the Java configuration, create a namedwork.helloQueue, one producer, and two consumers;
/** * Created by macro on 2020/5/19. */
@Configuration
public class WorkRabbitConfig {

    @Bean
    public Queue workQueue(a) {
        return new Queue("work.hello");
    }

    @Bean
    public WorkReceiver workReceiver1(a) {
        return new WorkReceiver(1);
    }

    @Bean
    public WorkReceiver workReceiver2(a) {
        return new WorkReceiver(2);
    }

    @Bean
    public WorkSender workSender(a) {
        return newWorkSender(); }}Copy the code
  • Producer passThe send methodTo a queuework.helloSends a message containing a number of.Number;
/** * Created by macro on 2020/5/19. */
public class WorkSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

    @Autowired
    private RabbitTemplate template;

    private static final String queueName = "work.hello";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3+1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('. ');
        }
        builder.append(index+1);
        String message = builder.toString();
        template.convertAndSend(queueName, message);
        LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • Two consumers from the queuework.helloGets the messages named respectivelyinstance 1andinstance 2Contained in the message.The more numbers, the longer the time;
/** * Created by macro on 2020/5/19. */
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

    private final int instance;

    public WorkReceiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String in) {
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'".this.instance, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s".this.instance, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '. ') {
                ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */
@Api(tags = "RabbitController", description = "RabbitMQ functional test")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private WorkSender workSender;

    @ApiOperation("Mode of operation")
    @RequestMapping(value = "/work", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult workTest(a) {
        for(int i=0; i<10; i++){ workSender.send(i); ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null); }}Copy the code
  • When run, the result is as follows, you can see that producers send different amounts of contains to the queue.The news of the.instance 1andinstance 2Consumers compete with each other and consume portions of the news.

Publish/subscribe

The publish/subscribe pattern is a pattern that sends messages to multiple consumers at the same time (in a broadcast-like form), consisting of one producer, two consumers, two queues, and one switch. Two consumers are bound to different queues, two queues are bound to the switch, producers send messages to the switch, and all consumers receive and consume messages.

Pattern diagram

Spring it realize

  • addPublish/subscribeRelated to the Java configuration, create a namedexchange.fanout, a producer, two consumers, and two anonymous queues, binding both anonymous queues to the switch;
/** * Created by macro on 2020/5/19. */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    public FanoutExchange fanout(a) {
        return new FanoutExchange("exchange.fanout");
    }

    @Bean
    public Queue fanoutQueue1(a) {
        return new AnonymousQueue();
    }

    @Bean
    public Queue fanoutQueue2(a) {
        return new AnonymousQueue();
    }

    @Bean
    public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanout);
    }

    @Bean
    public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanout);
    }

    @Bean
    public FanoutReceiver fanoutReceiver(a) {
        return new FanoutReceiver();
    }

    @Bean
    public FanoutSender fanoutSender(a) {
        return newFanoutSender(); }}Copy the code
  • Producer passThe send methodTo the switchexchange.fanoutSends a message containing a number of.Number;
/** * Created by macro on 2020/5/19. */
public class FanoutSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
    @Autowired
    private RabbitTemplate template;

    private static final String exchangeName = "exchange.fanout";

    public void send(int index) {
        StringBuilder builder = new StringBuilder("Hello");
        int limitIndex = index % 3 + 1;
        for (int i = 0; i < limitIndex; i++) {
            builder.append('. ');
        }
        builder.append(index + 1);
        String message = builder.toString();
        template.convertAndSend(exchangeName, "", message);
        LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • The consumer gets the message from the bound anonymous queue containing the.The more numbers, the longer the time, because the consumer can get and consume messages from two queues, can be regarded as two consumers, named respectively asinstance 1andinstance 2;
/** * Created by macro on 2020/5/19. */
public class FanoutReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);

    @RabbitListener(queues = "#{fanoutQueue1.name}")
    public void receive1(String in) {
        receive(in, 1);
    }

    @RabbitListener(queues = "#{fanoutQueue2.name}")
    public void receive2(String in) {
        receive(in, 2);
    }

    private void receive(String in, int receiver) {
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in) {
        for (char ch : in.toCharArray()) {
            if (ch == '. ') {
                ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */
@Api(tags = "RabbitController", description = "RabbitMQ functional test")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private FanoutSender fanoutSender;

    @ApiOperation(Publish/subscribe)
    @RequestMapping(value = "/fanout", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult fanoutTest(a) {
        for(int i=0; i<10; i++){ fanoutSender.send(i); ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null); }}Copy the code
  • When run, the result is as follows, you can see that producers send different amounts of contains to the queue.The news of the.instance 1andinstance 2The message is also retrieved and consumed.

Routing patterns

A routing pattern is a pattern that can selectively send messages to multiple consumers based on routing keys, including one producer, two consumers, two queues, and one switch. Two consumers are bound to different queues at the same time. The two queues are bound to the switch through the routing key. The producer sends messages to the switch, and the switch forwards messages to different queues through the routing key.

Pattern diagram

Spring it realize

  • addRouting patternsRelated to the Java configuration, create a namedexchange.directThe switch, one producer, two consumers, and two anonymous queues pass throughThe routing keyAre bound to the switch,Queue 1Is the routing key oforangeandblack.Queue 2Is the routing key ofgreenandblack;
/** * Created by macro on 2020/5/19. */
@Configuration
public class DirectRabbitConfig {

    @Bean
    public DirectExchange direct(a) {
        return new DirectExchange("exchange.direct");
    }

    @Bean
    public Queue directQueue1(a) {
        return new AnonymousQueue();
    }

    @Bean
    public Queue directQueue2(a) {
        return new AnonymousQueue();
    }

    @Bean
    public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(direct).with("orange");
    }

    @Bean
    public Binding directBinding1b(DirectExchange direct, Queue directQueue1) {
        return BindingBuilder.bind(directQueue1).to(direct).with("black");
    }

    @Bean
    public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(direct).with("green");
    }

    @Bean
    public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
        return BindingBuilder.bind(directQueue2).to(direct).with("black");
    }

    @Bean
    public DirectReceiver receiver(a) {
        return new DirectReceiver();
    }


    @Bean
    public DirectSender directSender(a) {
        return newDirectSender(); }}Copy the code
  • Producer passThe send methodTo the switchexchange.directTo send messages, using differentThe routing key, according to theThe routing keyWill be forwarded to a different queue;
/** * Created by macro on 2020/5/19. */
public class DirectSender {

	@Autowired
	private RabbitTemplate template;

	private static final String exchangeName = "exchange.direct";

	private final String[] keys = {"orange"."black"."green"};

	private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);

	public void send(int index) {
		StringBuilder builder = new StringBuilder("Hello to ");
		int limitIndex = index % 3;
		String key = keys[limitIndex];
		builder.append(key).append(' ');
		builder.append(index+1);
		String message = builder.toString();
		template.convertAndSend(exchangeName, key, message);
		LOGGER.info(" [x] Sent '{}'", message); }}Copy the code
  • The consumer gets the message from the anonymous queue that it is bound to, and since the consumer can get and consume messages from two queues, it can be regarded as two consumers named asinstance 1andinstance 2;
/** * Created by macro on 2020/5/19. */
public class DirectReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);

    @RabbitListener(queues = "#{directQueue1.name}")
    public void receive1(String in){
        receive(in, 1);
    }

    @RabbitListener(queues = "#{directQueue2.name}")
    public void receive2(String in){
        receive(in, 2);
    }

    private void receive(String in, int receiver){
        StopWatch watch = new StopWatch();
        watch.start();
        LOGGER.info("instance {} [x] Received '{}'", receiver, in);
        doWork(in);
        watch.stop();
        LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
    }

    private void doWork(String in){
        for (char ch : in.toCharArray()) {
            if (ch == '. ') {
                ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */
@Api(tags = "RabbitController", description = "RabbitMQ functional test")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private DirectSender directSender;

    @ApiOperation("Routing mode")
    @RequestMapping(value = "/direct", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult directTest(a) {
        for(int i=0; i<10; i++){ directSender.send(i); ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null); }}Copy the code
  • The result after running is as follows, you can see that the producer sends different contains to the queueThe routing keyThe message,instance 1Access to theorangeandblackThe message,instance 2Access to thegreenandblackThe message.

Wildcard mode

The wildcard pattern is a pattern that can selectively send messages to multiple consumers based on routing key matching rules, including one producer, two consumers, two queues, and one switch. Two consumers are bound to different queues at the same time, and the two queues are bound to the switch by routing key matching rules. The producer sends messages to the switch, and the switch forwards messages to different queues by routing key matching rules. The consumers bound to the queues receive and consume messages.

Special match symbol

  • *: can match only one word;
  • #: Can match zero or more words.

Pattern diagram

Spring it realize

  • addWildcard modeRelated to the Java configuration, create a namedexchange.topicThe switch, one producer, two consumers, and two anonymous queues match*.orange.*and*.*.rabbitSent to theQueue 1Matching,lazy.#Sent to theQueue 2;
/** * Created by macro on 2020/5/19. */
@Configuration
public class TopicRabbitConfig {

    @Bean
    public TopicExchange topic(a) {
        return new TopicExchange("exchange.topic");
    }

    @Bean
    public Queue topicQueue1(a) {
        return new AnonymousQueue();
    }

    @Bean
    public Queue topicQueue2(a) {
        return new AnonymousQueue();
    }

    @Bean
    public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
    }

    @Bean
    public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) {
        return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
    }

    @Bean
    public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
        return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
    }

    @Bean
    public TopicReceiver topicReceiver(a) {
        return new TopicReceiver();
    }

    @Bean
    public TopicSender topicSender(a) {
        return newTopicSender(); }}Copy the code
  • Producer passThe send methodTo the switchexchange.topicTo send a message containing a differentThe routing key;
/** * Created by macro on 2020/5/19. */
public class TopicSender {

	@Autowired
	private RabbitTemplate template;

	private static final String exchangeName = "exchange.topic";

	private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class);


	private final String[] keys = {"quick.orange.rabbit"."lazy.orange.elephant"."quick.orange.fox"."lazy.brown.fox"."lazy.pink.rabbit"."quick.brown.fox"};

	public void send(int index) {
		StringBuilder builder = new StringBuilder("Hello to ");
		int limitIndex = index%keys.length;
		String key = keys[limitIndex];
		builder.append(key).append(' ');
		builder.append(index+1);
		String message = builder.toString();
		template.convertAndSend(exchangeName, key, message);
		LOGGER.info(" [x] Sent '{}'",message);
		System.out.println(" [x] Sent '" + message + "'"); }}Copy the code
  • The consumer gets the message from the anonymous queue that it is bound to, and since the consumer can get and consume messages from two queues, it can be regarded as two consumers named asinstance 1andinstance 2;
/** * Created by macro on 2020/5/19. */
public class TopicReceiver {

	private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);

	@RabbitListener(queues = "#{topicQueue1.name}")
	public void receive1(String in){
		receive(in, 1);
	}

	@RabbitListener(queues = "#{topicQueue2.name}")
	public void receive2(String in){
		receive(in, 2);
	}

	public void receive(String in, int receiver){
		StopWatch watch = new StopWatch();
		watch.start();
		LOGGER.info("instance {} [x] Received '{}'", receiver, in);
		doWork(in);
		watch.stop();
		LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
	}

	private void doWork(String in){
		for (char ch : in.toCharArray()) {
			if (ch == '. ') {
				ThreadUtil.sleep(1000); }}}}Copy the code
  • Add a test interface to the Controller and call it to start sending messages;
/** * Created by macro on 2020/5/19. */
@Api(tags = "RabbitController", description = "RabbitMQ functional test")
@Controller
@RequestMapping("/rabbit")
public class RabbitController {

    @Autowired
    private TopicSender topicSender;

    @ApiOperation("Wildcard mode")
    @RequestMapping(value = "/topic", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult topicTest(a) {
        for(int i=0; i<10; i++){ topicSender.send(i); ThreadUtil.sleep(1000);
        }
        return CommonResult.success(null); }}Copy the code
  • The result after running is as follows, you can see that the producer sends different contains to the queueThe routing keyThe message,instance 1andinstance 2Matching messages were obtained.

The resources

The RabbitMQ Tutorials:www.rabbitmq.com/getstarted….

Project source code address

Github.com/macrozheng/…

The public,

Mall project full set of learning tutorials serialized, attention to the public number the first time access.