MQ is introduced

  1. MQ: Message Queue. A producer produces a message and stores it in a queue, and a consumer listens to the queue and consumes the message

  2. Advantages: The production and consumption of messages in MQ is asynchronous, with non-intrusive, low coupling between producer and consumer

  3. Common MQ frameworks

    # ActiveMQ
Introduce the RabbitMQ

  1. RabbitMQ website:

  2. The advantages of the RabbitMQ

  3. RabbitMQ related concepts

    # closer agreement
    Proposed in 2003, is a kind of advanced message protocol, does not limit the API layer, directly defines the network exchange data format, has the natural cross-platform. A virtual host has a set of switches, queues, and bindings that are used to divide RabbiMQ services. Generally, different services are configured with different virtual hosts. Users have permission control in the granularity of virtual hosts. Each RabbitMQ server is configured with a default virtual host '/' # switch for forwarding messages to queues. A channel object is created from the Connection object, and the data is transferred using the channel object. A channel can be regarded as a virtual connection, avoiding the overhead of creating real connections frequently
  4. To obtain the RabbitMQ

    • For Windows, download the RabbitMQ installation package from the official website and start it with the Erlang environment

    • Linux installs RabbitMQ through the package manager

    • Linux uses Dcoker to pull the RabbitMQ image and start it

  5. Manage the RabbitMQ

    RabbitMQ provides a Web management page on port 15672 by default. You can log in to the Web page to manage RabbitMQ service configurations

    Such as adding a virtual host

    If you add a user, you can set the user’s access to the virtual host

Use the RabbitMQ

The following RabbitMQ version is 3.8.23

Create channels

 // Get the factory object of the MQ connection object
ConnectionFactory connectionFactory = new ConnectionFactory();
// Set the connection IP
// Set the port number
// Set the virtual host
// Set the user name and password
// Get the connection object
Connection connection = connectionFactory.newConnection();
// Create a channel object
Channel channel = connection.createChannel();
Create a queue

/** * Parameter 1 Queue: name of the message queue * Parameter 2 Durable: Whether the queue is persistent (excluding messages in the queue) * Parameter 3 EXCLUSIVE: Whether the current connection is exclusive to the queue * Parameter 4 autoDelete: Whether the queue is automatically deleted after message consumption is complete and the connection is disconnected * argument 5 Argument: additional argument */
Work queue model

  1. Work queue model

    With a message queue, there are one or more consumers, each of whom retrieves a different message for consumption

  2. news

    channel.basicPublish(""."demoQueue".null."demo queue".getBytes());
    /** * parameter 1 exchange: switch name * parameter 2 routingKey: routing * parameter 3 props; Additional arguments * Argument 4 Body: Byte array for the message */
    If the switch name is null, the default switch is used

    Each queue automatically binds its Routing Key with the same name to the default switch

    The channel and connection must be closed after being used

  3. News consumption

    // bind the same queue as the producer
    // Start message listening
    /** * Parameter 1 queue: queue name * parameter 2 autoAck: automatic acknowledgement * parameter 3 callback: callback interface object */
    channel.basicConsume("demoQueue" DefaultConsumer(channel) {
        		// Body is the message content
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties,
                                           byte[] body) throws IOException {
  4. Distribution of the message

    In the work queue model, messages are evenly distributed across consumers by default

    If manual acknowledgment is used and only one message is consumed on each channel, the next message can be consumed only after acknowledgment

    In this way, the number of messages allocated is related to the speed of processing

    Manual acknowledgement message

    // Set the channel to consume only one message at a time
    // The channel does not get the allocation of the new message until it acknowledges the message
    // autoAck false
    channel.basicConsume("demoQueue" DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   BasicProperties properties,
                                   byte[] body) {
            System.out.println(new String(body));
            // Argument 1: the flag of the message, obtained through envelope. GetDeliveryTag ()
            // Parameter 2: Whether to enable multi-message acknowledgement
Publish and subscribe model

  1. Radio model

    Each consumer has its own queue, and each queue is bound to the switch, and the producer sends messages to the switch for distribution

  2. news

    // Declare a switch
    // Parameter 1: indicates the switch name
    // Parameter 2: switch type, fanout broadcast
    // Send a message
    channel.basicPublish("demoExchange"."".null."demo exchange".getBytes());
  3. News consumption

    // Declare a switch
    // Get the temporary queue name
    String tempQueueName = channel.queueDeclare().getQueue();
    // Bind queues to switches
    channel.queueBind(tempQueueName, "demoExchange"."");
    // Start message listening
    channel.basicConsume(tempQueueName, DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   BasicProperties properties,
                                   byte[] body) {
            System.out.println(new String(body));
    In the publish-subscribe model, producers only need to declare switches, consumers declare temporary queues and bind to switches, Routing keys all use empty strings, and once the switch receives the message, it forwards it to each bound queue

Routing topic model

  1. Routing topic model

    In the publish-subscribe model, all consumers can get the same message

    In the routing topic model, the switch no longer forwards messages to every queue, but matches more routes and topics

  2. news

    // Declare a switch
    // Send a message
    // Publish a message with the Routing Key info
    channel.basicPublish("routeExchange"."info".null."demo route info".getBytes());
    // Publish a message with the Routing Key error
    channel.basicPublish("routeExchange"."error".null."demo route error".getBytes());
  3. News consumption

     // Declare a switch. The type is direct
    // Get the temporary queue name
    String tempQueueName = channel.queueDeclare().getQueue();
    // Bind the switch, queue, and Routing Key
    channel.queueBind(tempQueueName, "routeExchange"."info");
    // Start message listening
    channel.basicConsume(tempQueueName, DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, 
                                   BasicProperties properties,
                                   byte[] body) {
            System.out.println(new String(body));
    Channel. queueBind can be called multiple times to bind multiple Routing keys

  4. Dynamic routing

    Dynamic Routing can use wildcards in Routing keys, with # representing any string and * representing a word

    Producer releases message

    // Declare a switch
    // Send several messages with different routes
    Consumer binding queue

    // Declare the switch, topic type
    // Get the temporary queue name
    String tempQueueName = channel.queueDeclare().getQueue();
    // Bind queues and routes
    channel.queueBind(tempQueueName, "topicExchange"."log.*");
    /** * log.* can match log.error,, log.error. File **. Info can match, */
    Summary of three models:

    Work queue model: No switch single queue, content-message

    Publish subscribe model: switch empty routing, same message

    Routing topic model: There are switches with routes, by route

Integrated SpringBoot

  1. Introduction of depend on

  2. The configuration application. Yaml

        host: 127.0. 01.
        port: 5672
        username: demoUser
        password: xxxxx
        virtual-host: demoMQ
    Once configured, the RabbitTemplate is available. RabbitTemplate encapsulates a set of RabbitMQ operations

  3. Work queue model


    public class RabbitProviderTest {
        RabbitTemplate rabbit;
        // Emulate the RabbitConsumer object
        // Prevent RabbitConsumer consumption messages instantiated during test startup
        public RabbitConsumer consumer;
        public void queueProvide(a) {
            // Send messages to the queue
            rabbit.convertAndSend("demoQueue"."hello demo queue"); }}Copy the code

    News consumption

    // @queue Specifies parameters such as durable and exclusive
    QueueDeclare ("demoQueue", true, false, false, null); // The default is channel.queueDeclare("demoQueue", true, false, false, null);
    @RabbitListener(queuesToDeclare = @Queue(name = "demoQueue"))
    public class RabbitConsumer {
        // The message handler can get the message body directly
        public void queueConsume(String message) { System.out.println(message); }}/ * * *@RabbitListenerIt can also be used on methods, denoted as message handler */
    In Spring AMQP, the work queue model is fair consumption

  4. Publish and subscribe model


    public class RabbitProviderTest {
        RabbitTemplate rabbit;
        public RabbitConsumer consumer;
        public void subscribeProvide(a) {
            rabbit.convertAndSend("demoExchange".""."hello demo exchange"+ i); }}Copy the code

    News consumption

    public class RabbitConsumer {
        // The first consumer
        // @queue creates a temporary Queue without specifying an argument
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "", exchange = @Exchange(name = "demoExchange", type = "fanout")))
        public void subscribeConsumer1(String message) {
            System.out.println("SubscribeConsume1 receives:" + message);
        // The second consumer
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "", exchange = @Exchange(name = "demoExchange", type = "fanout")))
        public void subscribeConsumer2(String message) {
  5. Routing topic model


    public class RabbitProviderTest {
        RabbitTemplate rabbit;
        public RabbitConsumer consumer;
        public void routeProvide(a) {
            rabbit.convertAndSend("routeExchange"."".""); }}Copy the code

    News consumption

    public class RabbitConsumer {
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "log.*", exchange = @Exchange(name = "routeExchange", type = "topic")))
        public void routeConsumer1(String message) {
            System.out.println("RouteConsumer1 receives:" + message);
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "log.#", exchange = @Exchange(name = "routeExchange", type = "topic")))
        public void routeConsumer2(String message) {
            System.out.println("RouteConsumer2 receives:" + message);
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "*.info", exchange = @Exchange(name = "routeExchange", type = "topic")))
        public void routeConsumer3(String message) {
            System.out.println("RouteConsumer3 receives:"+ message); }}/** * To use the wildcard to implement dynamic routing, set type to topic */
The RabbitMQ cluster

  1. Basic steps

    Prepare the RabbitMQ service and deploy two or more RabbitMQ services

    Configure the.erlang.cookie file..erlang.cookie is the key to join the service cluster and remains the same in the cluster service

    Run the cluster joining command on the secondary node

    Rabbitmqctl join_cluster --ram Rabbit @#-- RAM indicates a memory node. If no ram is added, it is a disk node by default
    Copy the code

    Set up a mirror queue on any node

    Strategy rabbitmqctl set_policy < name > "< name > queue" '{" ha - mode ":" < > mirror mode "}'#Parameter 1: indicates the policy name
    #Parameter 2: Matching rules for queue names, using regular expressions
    #Parameter 3: The principal rule of the mirror queue, a JSON string, with three attributes: ha-mode/ha-params/ha-sync-mode
    #Ha-mode: mirroring mode, all/exactly/ Nodes, all is stored on all nodes
    #--vhost Sets the virtual host
  2. Docker RabbitMQ cluster

    Pull the RabbitMQ image

    Prepare the rabbitmq.conf configuration file and configure default accounts and virtual hosts

    loopback_users.guest = false
    listeners.tcp.default = 5672
    management.tcp.port = 15672
    default_user = cluster
    default_pass = xxxxx
    default_vhost = clusterMQ
    Start three RabbitMQ containers and mount the same.erlang.cookie file

    # 1docker run \ --name rabbitmq1 \ -h rabbitmq1 \ -p 15673:15672 \ -p 5673:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data1:/var/lib/rabbitmq \ -v / var/docker/rabbitmq_cluster. Erlang. Cookies: / var/lib/rabbitmq /. Erlang. Cookies \ - d the rabbitmq: 3.8.23 - management# 2docker run \ --name rabbitmq2 \ -h rabbitmq2 \ -p 15674:15672 \ -p 5674:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data2:/var/lib/rabbitmq \ -v /var/docker/rabbitmq_cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ --link rabbitmq1:rabbitmq1 \ -d The rabbitmq: 3.8.23 - management# 3docker run \ --name rabbitmq3 \ -h rabbitmq3 \ -p 15675:15672 \ -p 5675:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data3:/var/lib/rabbitmq \ -v /var/docker/rabbitmq_cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ --link rabbitmq1:rabbitmq1 --link Rabbitmq2: rabbitmq2 \ - d the rabbitmq: 3.8.23 - managementCopy the code

    To join the cluster

    #Into the container
    docker exec -it rabbitmq2 bash
    #Stop the service
    rabbitmqctl stop_app
    #To add node 1, rabbit@ must use the host name, not the IP address
    #The host name is the parameter after docker run-h
    rabbitmqctl join_cluster --ram rabbit@rabbitmq1
    Configure a policy to specify the virtual host as clusterMQ

    rabbitmqctl set_policy --vhost clusterMQ demoPolicy "^" '{"ha-mode":"all"}'
    #use"^"Mirrors all queues
