1. Overview of messaging middleware

1.1 summary of MQ

MQ, short for Message Queue, is a container that holds messages during their transmission. It is used for communication between distributed systems.

Remote calls between applications

Calls between applications after joining MQ

1.2 the MQ advantage

1.2.1 Application decoupling

MQ acts as a mediator through which producers interact with consumers and decouples applications.

The more coupled the system, the less fault tolerant and maintainable it is.

Use MQ to decouple applications and improve fault tolerance and maintainability.

1.2.2 Task Asynchronous Processing

The message queue notifies the message receiver of asynchronously processed operations that do not require synchronous processing and are time-consuming. Improved application response time.

Time of an order operation: 20 + 300 + 300 + 300 = 920ms After the user clicks the order button, the user needs to wait 920ms to get the order response, too slow!

After clicking the order button, the user only needs to wait 25ms to get the order response (20 + 5 = 25ms). Improved user experience and system throughput (number of requests processed per unit of time).

1.2.3 Peak cutting and valley filling

For example, an order system writes data to a database when placing an order. However, the database can only support about 1000 concurrent writes per second, and any higher concurrent writes are prone to downtime. At the low peak, the number of concurrent requests is more than 100, but at the peak, the number of concurrent requests suddenly surges to more than 5000, and the database is definitely stuck.Messages are stored by MQ, and the system can consume as much as it can consume, say 1000 messages per second, slowly writing to the database so it doesn’t freeze.

With MQ, however, the consumption rate is limited to 1000 messages, but the data generated during the peak period is bound to be backlogged in MQ and the peak period is “whittled” away. However, due to the backlog of messages, the consumption rate of messages will remain at 1000QPS for a period of time after the peak period until the backlog of messages is consumed, which is called “valley filling”.

1.3 Disadvantages of MQ

  • The system availability decreases

    • The more external dependencies a system introduces, the worse its stability will be. Once MQ is down, services are affected. How can MQ be highly available?
  • System complexity enhancement

    • The addition of MQ has greatly increased the complexity of systems where synchronous remote calls between systems are now made asynchronously through MQ. How do I ensure that messages are not re-consumed? How to handle message loss? What about ensuring sequential message delivery?
  • Consistency problem

    • After the service is processed, system A sends A message to system B, C, and D through MQ. If system B and C process the message successfully, system D fails to process the message. How to ensure consistency of message data processing?

1.4 Common MQ products

At present, there are many MQ products in the industry, such as RabbitMQ, RocketMQ, ActiveMQ, Kafka, ZeroMQ, MetaMq, etc. There are also cases that directly use Redis as a message queue, and these message queue products have different focuses. In the actual selection, Need to combine their own requirements MQ product characteristics, comprehensive consideration.

RabbitMQ ActiveMQ RocketMQ Kafka
The company community Rabbit Apache Ali. Apache
Development of language Erlang Java Java Scala&Java
Protocol support AMQP,XMPP,SMTP,STOMP OpenWire, STOMP, REST, XMPP, closer The custom Custom protocol, community encapsulates HTTP protocol support
Client support languages Official support for Erlang,Java,Ruby, etc., community output of a variety of apis, support almost all languages Java, C, C++, Python,PHP,Perl,.NET, etc Java,C++ Java is officially supported, and the community produces a variety of apis, such as PHP,Python and so on
Single machine throughput Ten thousand level (second) Ten thousand (worst) One hundred thousand (best) One hundred thousand (second)
The message delay Subtle level millisecond millisecond Within milliseconds,
features Strong concurrency, excellent performance, low latency, active community, rich management interface Established products, high maturity, more documentation MQ function is relatively complete, scalability is good Only the major MQ functions are supported, after all, for the big data space

1.5 closer and JMS

There are two main ways to implement MQ: AMQP and JMS.

1.5.1 closer

The Advanced Message Queuing Protocol (AMQP) is a network Protocol and an open standard of application-layer protocols designed for message-oriented middleware. Client and message-oriented middleware based on this protocol can pass messages, following this protocol, and are not restricted by client and middleware products and development languages. In 2006, the AMQP specification was released. Analogy to the HTTP.

1.5.2 JMS

JMS is JavaMessage Service (JavaMessage Service) application program interface, is an API about message-oriented middleware in Java platform. JMS is one of the JavaEE specifications. Compared with JDBC, many message-oriented middleware have implemented JMS specifications, such as ActiveMQ. RabbitMQ does not officially provide JMS, but the open source community does

1.5.3 DIFFERENCE between AMQP and JMS

  • JMS defines a unified interface to unify message operations;
  • AMQP is a format for unifying data interaction by prescribing protocols. JMS specifies that the Java language must be used; AMQP is only a protocol and does not specify how to implement it, so it is cross-language.
  • JMS specifies two message modes; The MESSAGE pattern of AMQP is richer

2. RabbitMQ

RabbitMQ official address: www.rabbitmq.com/ RabbitMQ 1.0, based on the AMQP standard, was released in 2007 by Rabbit Technologies. RabbitMQ is developed in the Erlang language. Erlang is a language designed for the development of highly concurrent and distributed systems. It is widely used in telecommunications. The RabbitMQ infrastructure is shown below:

Related concepts in RabbitMQ:

  • Broker: An application that receives and distributes messages. RabbitMQ Server is Message Broker
  • Virtual Host: Designed for multi-tenancy and security reasons, the basic components of the AMQP are divided into Virtual groups, similar to the namespace concept in the network. When multiple users use the same RabbitMQ server, multiple vhosts can be created and each user creates an exchange or queue on their own Vhost
  • Connection: TCP Connection between publisher/consumer and broker Channel: If a Connection is established for every access to RabbitMQ, it can be expensive and inefficient to establish TCP connections when messages are heavy. A Channel is a logical connection established within a connection. If an application supports multithreading, it is common to create separate threads for each thread
  • Channels communicate, and the AMQP Method contains a channel ID to help the client and Message Broker identify the channel, so channels are completely isolated from each other. As a lightweight Connection, Channel greatly reduces the overhead of establishing TCP Connection in the operating system
  • Exchange: The first destination of the message to the broker, which matches the routing key in the query table to the queue according to the distribution rules. Common types are: Direct (point-to-point), topic (publish-subscribe), and fanout (multicast).
  • Queue: The message is sent here to be picked up by the consumer. Binding: The virtual link between the exchange and Queue can contain routing keys.
  • Binding information is stored in the query table in exchange and used as a basis for message distribution

RabbitMQ provides six modes: simple, Work, Publish/Subscribe, Routing, Topics, and RPC. Website corresponding mode is introduced: www.rabbitmq.com/getstarted….

2.1 Installing and Configuring RabbitMQ

2.1.1 Installing dependent Environments

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
Copy the code

2.1.2 install Erlang

RPM socat-1.7.3.2-5.el7.lux.x86_64. RPM rabbitmq-server-3.6.5-1

rpm -ivh Erlang - 18.3-1. El7. Centos. X86_64. RPM  
Copy the code

2.1.3 installation the RabbitMQ

# installation
rpm -ivh Socat 1.7.3.2-1.1. El7. X86_64. RPM --force --nodeps 
# installation
rpm -ivh The rabbitmq server - 3.6.5-1. Noarch. RPM
Copy the code

2.1.4 Enabling the Management page and Configuration

# Open the management page
rabbitmq-plugins enable rabbitmq_management 
# Change the default configuration
vim / usr/lib/rabbitmq/lib/rabbitmq_server - 3.6.5 ebin/rabbit. The app 
For example: <<"guest">> in loopback_users, keep only guest
Copy the code

2.1.5 start

service rabbitmq-server start # start service
service rabbitmq-server stop # stop service
service rabbitmq-server restart # restart service
Copy the code

Setting the configuration file

cd The/usr/share/doc/the rabbitmq server -- 3.6.5 / 
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
Copy the code

2.1.6 Configuring Virtual Hosts and Users

After RabbitMQ is installed, you can visit http://ip 15672. It comes with the guest/guest username and password. If you need to create a custom user. You can also log in to the management page and perform the following operations:

  • Role Description:
    • 1. The super administrator can log in to the management console, view all information, and operate users and policies.
    • Log on to the Management console and view rabbitMQ node information (number of processes, memory usage, disk usage, etc.)
    • 3. The Policy maker can log in to the administrative console and manage the policy. However, you cannot view the node information (marked by the red box in the figure above).
    • Common managers can only log in to the management console. They cannot see node information or manage policies.
    • Others who cannot log on to the management console are usually ordinary producers and consumers.

Virtual Hosts configuration like mysql has the concept of a database and can specify user permissions for operations such as libraries and tables. RabbitMQ has similar permission management; A Virtual message server can be installed in RabbitMQ. Each VirtualHost is an independent RabbitMQ server. Each VirtualHost is isolated from the other. The Exchange, Queue, and Message cannot communicate with each other. Equivalent to mysql db. The Virtual Name usually starts with a slash (/).

Create a Virtual Hosts

3. Introduction to the RabbitMQ

A simple model

In the model above, there are the following concepts:

  • P: the producer, that is, the program to send the message
  • C: consumer: the receiver of the message, waiting for the message to arrive
  • Queue: Message queue, shown in red. Like a mailbox, it can cache messages; Producers post messages to them, and consumers retrieve messages from them

3.1 Building an example project

3.1.1 Creating a Project

3.1.2 Adding a Dependency

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    The < version > 5.6.0 < / version >
</dependency>
Copy the code

3.1.3 Writing producers

Controller.Producer.java


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("124.222.211.143");
        // Connect port; The default value is 5672
        connectionFactory.setPort(5672);
        // Virtual host name; The default is /
        connectionFactory.setVirtualHost("/admin");
        // Connect user name; The default for the guest
        connectionFactory.setUsername("admin");
        // Connect password; The default for the guest
        connectionFactory.setPassword("admin");

        // Create a connection
        Connection connection = connectionFactory.newConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(QUEUE_NAME, true.false.false.null);

        // The message to send
        String message = "hi man!";
        /** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("Message sent:" + message);

        // Close the resourcechannel.close(); connection.close(); }}Copy the code

View console

3.1.4 Write consumers

Controller.Consumer.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("124.222.211.143");
        // Connect port; The default value is 5672
        connectionFactory.setPort(5672);
        // Virtual host name; The default is /
        connectionFactory.setVirtualHost("/admin");
        // Connect user name; The default for the guest
        connectionFactory.setUsername("admin");
        // Connect password; The default for the guest
        connectionFactory.setPassword("admin");

        // Create a connection
        Connection connection = connectionFactory.newConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.QUEUE_NAME, true.false.false.null);

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("The message received is:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);

        // Do not close resources, should always listen for messages
        // Close the resource
// channel.close();
// connection.close();}}Copy the code

4. RabbitMQ working mode

Queues

Mode description:

Compared with the simple model of a starter program, the use of one or more consumer Queues that jointly consume messages in the same queue. Application scenario: Using a work queue can speed up the processing of heavy or large tasks.

Extract the create connection section

util.ConnectionUtil.java

public class ConnectionUtil {
    public static Connection getConnection(a) throws Exception {
        // Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // Host address; The default is localhost
        connectionFactory.setHost("localhost");
        // Connect port; The default value is 5672
        connectionFactory.setPort(5672);
        // Virtual host name; The default is /
        connectionFactory.setVirtualHost("/admin");
        // Connect user name; The default for the guest
        connectionFactory.setUsername("admin");
        // Connect password; The default for the guest
        connectionFactory.setPassword("admin");
        // Create a connection
        returnconnectionFactory.newConnection(); }}Copy the code

4.4.1 producers

public class Producer {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();


        // Create channel
        Channel channel = connection.createChannel();

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(QUEUE_NAME, true.false.false.null);

        for (int i = 0; i < 30; i++) {
            // The message to send
            String message = "Hi man! work queue" + i;
            /** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("Message sent:" + message);
        }

        // Close the resourcechannel.close(); connection.close(); }}Copy the code

4.1.2 Consumer 1

public class Consumer1 {

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(com.zjl.rabbitmq.work.Producer.QUEUE_NAME, true.false.false.null);

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer1111 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);

        // Do not close resources, should always listen for messages
        // Close the resource
// channel.close();
// connection.close();}}Copy the code

4.1.3 Consumer 2

public class Consumer2 {

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.QUEUE_NAME, true.false.false.null);

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer222 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.QUEUE_NAME, true, consumer);

        // Do not close resources, should always listen for messages
        // Close the resource
// channel.close();
// connection.close();}}Copy the code

4.1.4 test

Start two consumers, and then start producers to send messages; Go to the respective consoles of the two consumers of IDEA to see if messages are received competitively.

4.2 Overview of the Subscription Model

Overview of subscription model:

In the subscription model, there is an exchange role, and the process is slightly different:

  • P: the producer, that is, the program that wants to send the message, but instead of sending it to the queue, it sends it to X (the switch) -c: the consumer, the receiver of the message, waits for the message to arrive.
  • Queue: Message Queue, receiving and caching messages.
  • Exchange: the X in the figure. On the one hand, receive messages sent by producers. On the other hand, know how to process messages, such as delivering them to a particular queue, delivering them to all queues, or throwing them away. Exactly how to do this depends on the type of Exchange. There are three common types of Exchange:
    • Fanout: Broadcast messages to all queues bound to the switch
    • Direct: directs the message to the queue matching the specified routing key
    • Topic: a wildcard that sends a message to a queue that matches the routing pattern

Exchange (a switch) is only responsible for forwarding messages and does not have the ability to store messages, so if there are no queues bound to Exchange or no queues that conform to routing rules, messages will be lost.

4.3 Publish/Subscribe Publish and Subscribe Mode

4.3.1 Mode Description

Publish subscribe mode: 1. Each consumer listens to its own queue. 2. The producer sends the message to the broker, which forwards the message to each queue bound to the switch. Each queue bound to the switch receives the message

4.3.2 producers

public class Producer {
    // Switch name
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    // Queue name
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    // Queue name
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(FANOUT_QUEUE_1, true.false.false.null);
        channel.queueDeclare(FANOUT_QUEUE_2, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");

        for (int i = 0; i < 10; i++) {
            // The message to send
            String message = "Hi man! work queue" + i;
            /** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
            channel.basicPublish(FANOUT_EXCHANGE, "".null, message.getBytes());
            System.out.println("Message sent:" + message);
        }

        // Close the resourcechannel.close(); connection.close(); }}Copy the code

4.3.3 Consumer 1

public class Consumer1 {

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.FANOUT_QUEUE_1, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer1111 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer); }}Copy the code

4.3.4 test

Start all consumers, then use producers to send messages; All messages sent by the producer can be viewed in the console corresponding to each consumer; Reach the broadcast effect.

After executing the test code, go to the RabbitMQ adminExchangesTAB, clickfanout_exchangeYou can view the following bindings:

4.3.5 summary

The switch needs to be bound to the queue. A message can be received by multiple consumers.

The difference between the publish and subscribe mode and the work queue mode

1. Work queue mode does not define switches, whereas publish/subscribe mode does. 2. The producer of publish/subscribe mode sends messages to the switch, and the producer of work queue mode sends messages to the queue (the default switch is used at the bottom). 3. In publish/subscribe mode, you need to set the binding between the queue and the switch. In work queue mode, you need not set the binding between the queue and the switch.

4.4 Routing Mode

4.4.1 Mode Description

Features of routing mode:

  • The binding of the queue to the switch is no longer arbitrary, but instead specifies a RoutingKey.
  • The sender of a message must also specify a RoutingKey for the message when sending it to the Exchange.
  • Instead of sending messages to each bound queue, Exchange determines the message’s Routing Key and receives messages only if the queue’s Routingkey matches the message’s Routing Key.

Illustration:

  • P: the producer sends a message to the Exchange. When sending a message, a routing key is specified.
  • X: Exchange, which receives the message from the producer and sends it to the queue with the exact matching routing key
  • C1: consumer, whose queue specifies the message whose routing key is error
  • C2: consumer, whose queue specifies the message whose routing key is info, Error, and Warning

4.4.2 producers

public class Producer {
    // Switch name
    static final String DIRECT_EXCHANGE = "direct_exchange";
    // Queue name
    static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
    // Queue name
    static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);


        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(DIRECT_QUEUE_INSERT, true.false.false.null);
        channel.queueDeclare(DIRECT_QUEUE_UPDATE, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, "insert");
        channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, "update");


        // The message to send
        String message = "New goods. Routing mode; Routing key is insert" ;

        /** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
        channel.basicPublish(DIRECT_EXCHANGE, "insert".null, message.getBytes());
        System.out.println("Message sent:" + message);

        // The message to send
        message = "Modified the product. Routing mode; Routing key: update" ;

        /** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
        channel.basicPublish(DIRECT_EXCHANGE, "update".null, message.getBytes());
        System.out.println("Message sent:" + message);

        // Close the resource
        channel.close();
        connection.close();
}

Copy the code

4.4.3 Consumer 1

public class Consumer1 {

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
        
        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE, "insert");

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer1111 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer); }}Copy the code

4.4.4 Consumer 2

public class Consumer2 {
    
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
        
        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHANGE, "update");

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer222 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer); }}Copy the code

4.4.5 test

Start all consumers, then use producers to send messages; The message sent by the producer corresponding to the routing key corresponding to the queue can be viewed on the console corresponding to the consumer. Arrive at the desired effect.

After executing the test code, actually go to the Exchanges TAB in the RabbitMQ administration background and click on the Direct_Exchange switch to see the following bindings:

4.4.5 summary

In Routing mode, the queue must specify a Routing key when binding the switch. The message is forwarded to the queue that matches the Routing key.

4.5 Topics Wildcard mode

4.5.1 Mode Description

Topic types, in contrast to Direct, can route messages to different queues based on a RoutingKey. The Topic Exchange type allows queues to bind Routing keys using wildcards. A Routingkey is typically made up of one or more words, with “between” words. Insert example: # : match one or more words * : match no more, no less, exactly 1 word

Illustration:

  • Red Queue: the Queue is bound to USA.#, so any Queue with USA. The initial routing key will be matched
  • Yellow Queue: bind to #. News, so any routing key ending in. News will be matched

4.5.2 of producers

public class Producer {
    // Switch name
    static final String TOPIC_EXCHANGE = "topic_exchange";
    // Queue name
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    // Queue name
    static final String TOPIC_QUEUE_2 = "topic_queue_2";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

        /** * Parameter 1: switch name, if not specified use the Default Default Exchage * parameter 2: route key, simple mode can pass queue name * parameter 3: other message properties * parameter 4: message content */
        // The message to send
        String message = "New goods. Topic mode; Routing key is item.insert" ;
        channel.basicPublish(TOPIC_EXCHANGE, "item.insert".null, message.getBytes());
        System.out.println("Message sent:" + message);

        // The message to send
        message = "Modified the product. Topic mode; Routing keys for the item. The update" ;
        channel.basicPublish(TOPIC_EXCHANGE, "item.update".null,
                message.getBytes());
        System.out.println("Message sent:" + message);

        // Send a message
        message = "Deleted the item. Topic mode; Routing keys for the item. Delete" ;
        channel.basicPublish(TOPIC_EXCHANGE, "item.delete".null,
                message.getBytes());
        System.out.println("Message sent:" + message);

        // Close the resourcechannel.close(); connection.close(); }}Copy the code

4.5.3 Consumer 1

public class Consumer1 {

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.TOPIC_QUEUE_1, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.update");
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.delete");

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer1111 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer); }}Copy the code

4.5.4 consumers

public class Consumer2 {

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        Parameter 1: switch name * Parameter 2: switch type, fanout, topic, direct, headers */
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);

        // Declare (create) a queue
        /** * Parameter 1: queue name * Parameter 2: Define persistent queue * Parameter 3: exclusive connection * Parameter 4: automatically delete queue when not in use * parameter 5: other queue parameters */
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true.false.false.null);

        // The queue binds the switch
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE, "item.*");

        // Receive the message
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /** * consumerTag specifies the contents of the envelope message packet while channel.basicConsume, from which the message ID, message routingKey, and switch are obtained. Message and retransmission flags (whether a message needs to be resent if it fails to be received) * Properties property information * Body message */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                / / routing key
                System.out.println("The route key is:" + envelope.getRoutingKey());
                / / switches
                System.out.println("Switch is:" + envelope.getExchange());
                / / message id
                System.out.println("Message ID is:" + envelope.getDeliveryTag());
                // Received message
                System.out.println("Consumer222 received the following message:" + new String(body, "utf-8")); }};// Listen for messages
        /** * Parameter 1: queue name * Parameter 2: automatic acknowledgement. If this parameter is set to true, the message is automatically sent to MQ and will be deleted when the message is received. If this parameter is set to false, manual confirmation is required
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer); }}Copy the code

4.5.5 test

Start all consumers, then use producers to send messages; The message sent by the producer corresponding to the routing key corresponding to the queue can be viewed on the console corresponding to the consumer. To reach the desired effect; And these routing keys can use wildcards.

After executing the test code, actually go to the Exchanges TAB in RabbitMQ administration and click on the topic_exchange switch to see the following bindings:

4.5.6 summary

Topic mode can realize Publish/Subscribe mode and Routing mode function; However, wildcard characters can be used in routing key configuration for Topic, which is more flexible.

4.6 Mode Summary

RabbitMQ working mode: HelloWorld: One producer, one consumer, no switch required (default switch used) Publish/subscribe you need to set the fanout switch and bind the switch to the queue. When sending messages to the switch, The switch sends messages to the bound queue. 4. Routing Mode Set Routing to the switch whose type is Direct, bind the switch to the queue and specify a Routing key. The switch sends the message to the corresponding queue based on the routing key. 5. Wildcard Topic The switch needs to be set as Topic, bind the switch to the queue, and specify the wildcard routing key. The exchange sends the message to the corresponding queue based on the routing key