About message queue, I have read some information intermittently since the year before last. I have been thinking about writing for a long time, but I haven’t got any spare time. Recently, I met several friends talking about the technical selection of this piece, it is time to sort out and record the knowledge of this piece.

There are many message queue products in the market, such as the old ActiveMQ, RabbitMQ, Kafka which is the most popular in my view, and ZeroMQ, RocketMQ which Alibaba donated to Apache at the end of last year. Even NoSQL databases such as Redis support MQ functionality. In general, there are more than a dozen of these well-known products, and I’m only going to talk about RabbitMQ, Kafka and ActiveMQ based on my own experience and interests.

What is a message queue

A Message is data that is sent between applications. Messages can be very simple, such as containing only text strings, or more complex, possibly containing embedded objects.

Message Queue is a communication mode between applications. Messages can be returned immediately after being sent. The Message system ensures the reliable delivery of messages. Message publishers can publish messages to MQ regardless of who gets them, and message consumers can fetch messages from MQ regardless of who publishes them. This way neither publisher nor user needs to know about each other’s existence.

Why message queues

As you can see from the above description, message queues are an asynchronous collaboration mechanism between applications, so when is MQ needed?

Take the common order system as an example, after the user clicks the “order” button, the business logic may include: deducting inventory, generating corresponding documents, sending red envelopes, sending SMS notification. In the early stage of service development, these logics may be executed synchronously. As the number of orders increases, the system service performance needs to be improved. In this case, some operations that do not take effect immediately can be separated and executed asynchronously, such as sending red packets and sending SMS notifications. MQ can be used in this scenario, where a message is sent to MQ after the main process of placing an order (such as inventory reduction and receipt generation) is complete to allow the main process to complete quickly, and MQ’s message is pulled by a separate thread (or pushed by MQ), and when a red envelope or SMS message is found in MQ, Execute the corresponding business logic.

This is for service decoupling, other common scenarios include final consistency, broadcasting, staggered flow control, and so on.

The RabbitMQ characteristics

RabbitMQ is an open source implementation of AMQP developed in the Erlang language.

AMQP: Advanced Message Queue. It is an open standard of application layer protocol, designed for message-oriented middleware. The client and message-oriented middleware based on this protocol can transfer messages, regardless of product and development language conditions.

RabbitMQ originated in the financial system and is used to store and forward messages on distributed systems. RabbitMQ is very easy to use, scalable, and highly available. Specific features include:

  1. RabbitMQ uses mechanisms to ensure Reliability, such as persistence, transport confirmation, and release confirmation.

  2. Flexible Routing Is used to route messages through Exchange before they are queued. For typical routing, RabbitMQ already provides some built-in Exchange. For more complex routing capabilities, you can bind multiple Exchanges together or implement your own Exchange through a plug-in mechanism.

  3. Message Clustering (Clustering) RabbitMQ servers can form a cluster to form a logical Broker.

  4. Highly Available Queues can be mirrored on machines in a cluster, making them usable even if some nodes fail.

  5. RabbitMQ supports multiple message queuing protocols, such as STOMP and MQTT.

  6. RabbitMQ supports almost all common languages, such as Java,.NET, Ruby, and so on.

  7. RabbitMQ provides an easy-to-use user interface that enables users to monitor and manage many aspects of a message Broker.

  8. If a message is abnormal, RabbitMQ provides a Tracing mechanism for the user to find out what happened.

  9. RabbitMQ provides many plug-ins to extend RabbitMQ in many ways, or you can write your own.

Conceptual model in RabbitMQ

A message model

All MQ products are the same process in model abstraction: consumers subscribe to a queue. Producers create messages, publish them to queues, and then send the messages to listening consumers.

The message flow

RabbitMQ basic concepts

This is the simplest and most abstract description, but RabbitMQ can be explained in more detail. As mentioned above RabbitMQ is an open source implementation of the AMQP protocol, so it is actually a basic concept within AMQP:

RabbitMQ internal structure

  1. “Message” is an anonymous Message that consists of a header and a body. The body of the message is opaque, and the header consists of a set of optional attributes, including routing-key, priority (priority over other messages), delivery-mode (indicating that the message may require persistent storage), and so on.
  2. Publisher is a producer of messages and a client application that publishes messages to the exchange.
  3. An Exchange Exchange that receives messages sent by producers and routes them to queues in the server.
  4. Binding The association between message queues and switches. A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be thought of as a routing table made up of bindings.
  5. Queue Message Queue, used to hold messages until they are sent to consumers. It is the container and destination of the message. A message can be put into one or more queues. The message remains in the queue, waiting for the consumer to connect to the queue to pick it up.
  6. Connection A network Connection, such as a TCP Connection.
  7. Channel an independent two-way data Channel in a multiplexing connection. The channel is built on the real TCP connection and the virtual connection, AMQP command is sent through the channel, whether it is to publish messages, subscribe queue or receive messages, these actions are completed through the channel. Because it is very expensive for an operating system to establish and destroy TCP, the concept of a channel was introduced to reuse a TCP connection.
  8. Consumer, the Consumer of a message, represents a client application that retrieves a message from a message queue.
  9. Virtual Host A Virtual Host that represents a batch of switches, message queues, and related objects. A virtual host is a separate server domain that shares the same authentication and encryption environment. Each Vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /.
  10. The Broker represents the message queue server entity.
Message routing in AMQP

The routing of messages in AMQP differs a bit from the JMS process familiar to Java developers, with the addition of Exchange and Binding roles. The producer publishes the message to the Exchange, the message eventually reaches the queue and is received by the consumer, and the Binding determines which queue the Exchange’s message should be sent to.

AMQP message routing process

Exchange type

Exchange distributes messages according to different distribution policies. Currently, there are four types: Direct, FANout, Topic, and headers. Headers matches the header of an AMQP message, not the routing key. The headers exchange is exactly the same as the Direct exchange, but is much less efficient and is almost not used today.

  1. direct


    Direct switch


    If the routing key in the message matches the Binding key in Binding, the exchange sends the message to the corresponding queue. The routing key matches the queue name exactly. If a queue is bound to the switch and requires the routing key to be “dog”, only messages with the routing key marked “dog” will be forwarded. “dog.puppy” and “dog.guard” will not be forwarded. It is a perfectly matched, unicast pattern.

  2. fanout


    Fanout exchange


    Each message sent to a FANout type exchange is sent to all bound queues. The FANout exchange does not handle routing keys, but simply binds queues to the exchange, and every message sent to the exchange is forwarded to all queues bound to the exchange. Much like subnet broadcasting, each host in a subnet gets a copy of the message. Fanout type forwarding messages is the fastest.

  3. topic

    Topic switch


    The Topic exchange allocates the routing key attribute of the message through pattern matching, matching the routing key to a pattern to which the queue must be bound. It splits the strings of routing and binding keys into words separated by dots. It also recognizes two wildcards: the symbol “#” and the symbol”

    “. # Match 0 or more words,

    Match no more than one word.

The RabbitMQ installation

Erlang is usually installed before installing RabbitMQ, which can be downloaded from the Erlang website. Go to the RabbitMQ website to download the installation package and unzip it. The official website provides installation instructions for different operating systems: Windows, Debian/Ubuntu, RPM- Based Linux, and Mac

If you are a Mac user, I recommend using HomeBrew for installation. Please update the brew before installing:

brew update
Copy the code

Install rabbitMQ server:

brew install rabbitmq
Copy the code

RabbitMQ is then installed and the Erlang on which it depends is automatically installed.

RabbitMQ runs and manages

  1. You can find six executable files starting with RabbitMQ in the sbin directory. Run the rabbitmq-server command to install RabbitMQ. Instead, the start command is:
./sbin/rabbitmq-server
Copy the code

If the startup is normal, you will see some startup process information and completed with 7 plugins at the end, indicating that seven plug-ins were loaded by default at startup.


Normal boot

  1. Detached If you want RabbitMQ to run in the background as a daemon, add the -detached parameter to start:
./sbin/rabbitmq-server -detached
Copy the code
  1. There is a particularly important file in the sbin directory called rabbitmqctl, which provides almost a one-stop solution for RabbitMQ management, providing most of the o&M commands. To query the RabbitMQ server status, use the status parameter:
./sbin/rabbitmqctl status
Copy the code

This command outputs a lot of information about the server, such as RabbitMQ and Erlang versions, OS names, memory, and so on

  1. We know that RabbitMQ is written in Erlang and there are two concepts in Erlang: nodes and applications. A node is each instance of an Erlang virtual machine, and multiple Erlang applications can run on the same node. Nodes can communicate locally with each other (whether they are running on the same server or not). For example, an application running on node A can call the application’s methods on node B as if they were local functions. If the application crashes for some reason, the Erlang node automatically tries to restart the application. To stop an entire RabbitMQ node, use the stop argument:
./sbin/rabbitmqctl stop
Copy the code

It communicates with the local node and instructs it to clean down. It can also specify to shut down different nodes, including remote nodes, by passing -n:

./sbin/rabbitmqctl -n rabbit@server.example.com stop 
Copy the code

-n node The default node name is rabbit@server. If your host name is server.example.com, the node name is [email protected].

  1. To stop the RabbitMQ application, use stop_app if you want to stop the application while keeping the Erlang node running:
./sbin/rabbitmqctl stop_app
Copy the code

This command will be useful in the cluster mode described later.

  1. Start the RabbitMQ application
./sbin/rabbitmqctl start_app
Copy the code
  1. Reset the RabbitMQ node
./sbin/rabbitmqctl reset
Copy the code

This command clears all queues.

  1. View declared queues
./sbin/rabbitmqctl list_queues
Copy the code
  1. Viewing switches
./sbin/rabbitmqctl list_exchanges
Copy the code

This command can also attach parameters, such as listing the name, type, persistence, and automatic deletion of the switch:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
Copy the code
  1. Check the binding
./sbin/rabbitmqctl list_bindings
Copy the code

Java client access

RabbitMQ supports multiple languages. The following uses Java as an example to describe how to use RabbitMQ.

  1. Add dependencies to maven project POM files
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>
Copy the code
  1. Message producer
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        // Set the RabbitMQ address
        factory.setHost("localhost");
        // Establish a connection to the proxy server
        Connection conn = factory.newConnection();
        // Get the channel
        Channel channel = conn.createChannel();
        // Declare the switch
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct".true);

        String routingKey = "hola";
        // Publish the message
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); }}Copy the code
  1. Message consumer
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        // Establish a connection to the proxy server
        Connection conn = factory.newConnection();
        // Get the channel
        final Channel channel = conn.createChannel();
        // Declare the switch
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct".true);
        // Declare a queue
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        // Bind the queue to the switch with the key hola
        channel.queueBind(queueName, exchangeName, routingKey);

        while(true) {
            // Consume messages
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("Consumed routing key:" + routingKey);
                    System.out.println(Type of content consumed: + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    // Confirm the message
                    channel.basicAck(deliveryTag, false);
                    System.out.println("Consumed message body content:");
                    String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); }}); }}}Copy the code
  1. Start the RabbitMQ server
./sbin/rabbitmq-server
Copy the code
  1. Run Consumer first so that when the producer sends a message, the record of the message is seen in the Consumer back end.
  2. Running Producer Then running Producer posts a message that can be seen on the Consumer console:


    Consumer console

The RabbitMQ cluster

One of RabbitMQ’s best features is built-in clustering, which is designed to allow consumers and producers to continue to operate even if a node crashes, and to linearly expand message throughput by adding more nodes. RabbitMQ internally uses the distributed communication framework OTP provided by Erlang to meet these requirements, allowing a client to lose a RabbitMQ connection and reconnect to any other node in the cluster to produce and consume messages.

Some concepts in RabbitMQ clustering

RabbitMQ will always record four types of internal metadata:

  1. Queue metadata includes queue names and their properties, such as whether they are persistent and automatically deleted
  2. Switch metadata Switch name, type, and properties
  3. Inside the binding metadata is a table of how to route a message to a queue
  4. Vhost metadata provides namespaces and security attributes for queues, switches, and bindings within a VHost

In a single node RabbitMQ stores all of this information in memory and stores the queues, switches and bindings marked as persistable on hard disk. Saving to hard disk ensures that queues and switches can be rebuilt after a node is restarted. In clustered mode, there are also two options: save to hard disk (the default for standalone nodes) and save to memory.

If queues are created in a cluster, the cluster only creates complete queue information (metadata, status, content) on a single node rather than all nodes. The result is that only the owner node of the queue knows all information about the queue, so when the cluster node crashes, the queue and binding for that node are gone, and any new messages matching the binding for that queue are lost. Fortunately, mirrored queues have been provided since RabbitMQ 2.6.0 to prevent the contents of queues from becoming unavailable due to cluster node failures.

RabbitMQ clusters can share users, vhosts, exchanges, etc. All data and state must be replicated on all nodes, with the exception of message queues. RabbitMQ nodes can be dynamically added to a cluster.

When declaring queues, switches, and bindings in a cluster, these actions are not returned until all cluster nodes have successfully committed their metadata changes. There are two types of clusters: memory nodes and disk nodes. Although memory nodes do not write to disk, they perform better than disk nodes. How do you balance memory nodes, which provide excellent performance, and disk nodes, which ensure that configuration information is still available after a node is restarted?

RabbitMQ requires only one disk node in the cluster. All other nodes can be memory nodes. When a node joins a fire and leaves the cluster, they must notify at least one disk node of the change. If there is only one disk node and it happens to crash, the cluster can continue routing messages, but it cannot create queues, create switches, create bindings, add users, change permissions, add or remove cluster nodes. In other words, if the only disk node in the cluster crashes, the cluster can still run, but nothing can be changed until the node recovers.

Configure and start the RabbitMQ cluster

If multiple RabbitMQ nodes are started on one machine to form a cluster, starting the second and third nodes as described above will fail due to node name and port conflicts. So set the RABBITMQ_NODENAME and RABBITMQ_NODE_PORT environment variables to specify a unique node name and port each time the rabbitmq-server command is called. In this example, the port number starts at 5672, and each newly started node is incremented by 1. The nodes are also named test_rabbit_1, test_rabbit_2, and test_rabbit_3.

Start node 1:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
Copy the code

Start the second node:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
Copy the code

Before starting the second node, you are advised to disable the plugins that are activated by default for RabbitMQ. Otherwise, a port number conflict will occur and the RabbitMQ node will fail to start.

Now the second node and the first node are both independent nodes, unaware of the existence of other nodes. The next node to join the cluster will need to obtain the metadata from the cluster, so stop the RabbitMQ application running on the Erlang node and reset the metadata, then join and obtain the metadata from the cluster, and finally restart the RabbitMQ application.

Stop the application on the second node:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app
Copy the code

Reset the metadata of the second node:

./sbin/rabbitmqctl -n test_rabbit_2 reset
Copy the code

The second node joins the cluster formed by the first node:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
Copy the code

Start the application for the second node

./sbin/rabbitmqctl -n test_rabbit_2 start_app
Copy the code

The configuration of the third node is similar to that of the second node:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached

./sbin/rabbitmqctl -n test_rabbit_3 stop_app

./sbin/rabbitmqctl -n test_rabbit_3 reset

./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost

./sbin/rabbitmqctl -n test_rabbit_3 start_app
Copy the code
RabbitMQ cluster o&M

Stopping a specified node, such as stopping a second node:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
Copy the code

View the cluster status of node 3.

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
Copy the code