Let me write it out front

It has been a long time since the last post, in fact, this period of time has not stopped writing, but busy looking for a job and school closure things, to make a blog, the back will also be gradually updated the article recently ~

  • This article is a bit long, so it is divided into two parts
  • PS: The JAVA Q&A article on GitHub has not stopped writing and will be updated in the near future

1. Take it easy

1.1 What is middleware?

Definition of IDC (Internet Data Center) : Middleware is an independent system software service program that distributed applications use to share resources between different technologies. Middleware sits on the operating system of the client server and manages computing resources and network communications.

First, middleware is a general term for a class of software, rather than a specific type of software. It is a kind of platform (hardware) operating system and application between the universal service, it blocked the complexity of the underlying operating system, easing the burden on developers technology, at the same time, its design is not for a specific goal, but has the universal characteristics of function module services, these services have standard program interfaces and protocols, There can be different implementations depending on the platform.

Common examples (for reference only, not necessarily consistent) :

  • I run A coffee shop, and I have N suppliers of coffee beans, including A, B, and C, but I must choose affordable and high-quality beans. However, the market is fluctuated by many factors, and my current choice may not be the best choice after A period of time. So I specially found a market agent and asked him to take care of this business for me. I will only clarify the price and quality requirements with you, and you can find them. I won’t worry about the process at all. The concept of this mediation is similar to that of middleware

1.1.1 The concept of distribution (Supplement)

This paragraph is from my previous article on Dubbo

The definitions on Baidu and Wikipedia are relatively technical and obscure. Most blogs and tutorials often use the definition from the Principles and Paradigms of Distributed Systems, which states: “A distributed system is a collection of several independent computers that appear to the user as a single related system.”

Let’s use some space to explain in general terms what is distributed

1.1.1.1 What is a centralized system

When it comes to distributed system, we have to mention “centralized system”. This concept is best understood. It is to install functions and programs on the same device and provide services from this one host device

Take the simplest example: You take a PC host, you turn it into a simple server, you configure everything, you install MySQL, Web server, FTP, Nginx, and so on, you package your deployment project, you can provide services to the outside world, but if there is a software or hardware problem with the machine, The whole system would be severely compromised error, all eggs in one basket, all eggs in one basket

1.1.12 What is a distributed system

Now that the centralized system, there is a extremely important distributed one role, then, nature is to solve this problem, known as the definition, a distributed system in the user’s sensory experience, just like the traditional single system, some changes are to be carried out within the system itself, and nothing is too big for the users

Such as: Taobao, jingdong this large electric business platform, they are tens of thousands of the host, otherwise can’t deal with large amounts of data and the request, what specific including division, as well as the operation, we would say to the below, but for our users, we don’t need to also don’t want to care about these, we can still think, What we are facing is “Taobao” this one “host”

So a relatively technical term for distributed is such as (process granularity) two or more programs, respectively running on different host processes, they cooperate with each other to complete the common function, then the system formed between these several programs can be called distributed system

  • All of them are the same program — distributed
  • These are all different programs — clusters

1.2 What is Message Middleware/Message Queuing (MQ)

Message middleware, just as its name implies is used to process the message middleware services, it provides a system of communication interaction between channels, such as the sender only needs to want to transfer the information to the message middleware, and send the agreement, manner, appeared in the process of sending the network, the fault and so on, are handled by the middleware, It is therefore responsible for ensuring the reliable transmission of information.

Therefore, message middleware is a technology used to receive data, store data and send data. It provides various functions to achieve high availability and reliability of messages, as well as a good fault-tolerant mechanism. Procedures can be the occupation of system resources, as well as the improvement of transmission efficiency has a great help.

  • MQ refers to Message queues, namely Message Quene. Common Message queues include classic Activiemq, popular Kafka, Ali’s RocketMQ, and RabbitMQ explained here.

    • Different MQ has different characteristics, and they are better at the direction, but it is not good to say who is bad, only who is more suitable.

1.2.1 Message queue application scenario

According to the needs of the business, it can actually be applied in a variety of scenarios, such as decoupling, peak clipping, valley filling, broadcasting, etc. Let’s take two scenarios to tease out the simple process

1.2.1.1 Service decoupling

Recently in the consideration of buying a few books to see, to buy a book order for example, when I click to buy, there may be such a string of business logic implementation, ① minus the inventory capacity ② generate orders ③ pay ④ update the status of the order ⑤ send purchase success message ⑥ update the status of the express delivery of goods. In the initial stage, we can make these business synchronous execution, but in the late stage in order to improve efficiency, you can need to immediately execute the task and the task can be slightly delayed for separation, such as sending a successful purchase SMS ⑥ update the status of delivery of goods, can be considered different execution. At the end of the main process execution, these deferable transactions can be determined to have been executed by sending a message to MQ to ensure that the flow process ends first. It then performs other business asynchronously by pulling MQ messages, or by MQ actively pushing.

1.2.1.2 Peak clipping and valley filling

For example, sending an announcement message with the mark of read and unread, so it is necessary to write such an announcement message for each user, such as storing it in MongoDB. Even MongoDB cannot support the instantaneous writing of millions or tens of millions of records, so we can consider using message queue. For example, we can use asynchronous multithreading on a Java back-end system to send messages to a message queue MQ, so that the Web system can publish the announcement message without taking up the normal CRUD operation of the database. System messages are stored in the message queue, which we use only for peaking and clearing, but ultimately system messages are stored in the database. So we can design such a design, when the user logs in the system, the asynchronous thread receives the user’s system message from the message queue MQ, and then stores the system message in the database, and finally the message in the message queue MQ is automatically deleted. The task of writing a message to the database becomes the task of writing a message to the database because of the user’s wrong peak login.

1.3 What is RabbitMQ

RabbitMQ is an open source message queuing system written in Erlang and following the AMQP protocol. It supports a variety of clients (languages) for storing and forwarding messages in distributed systems. It has high availability, scalability, and ease of use.

For more details, please refer to the official website:

  • https://www.rabbitmq.com/

In short, this is a common message queue, its characteristics, will be explained in the following section, we first start from the entry download and install part, and then to use.

2. Download and install

Generally speaking, there are manual installation and Docker installation. Docker installation will be used in most scenarios, but as a learning stage, it is not a bad thing to learn manual installation if you are not in a hurry.

Note: Both cloud servers and virtual machines are available. The demo Linux version is CentOS 7.9

2.1 Manual installation

2.1.1 Download and install process

Note: You can download and install the files directly in Linux via YUM. Here you choose to download the files from your own Windows host first, and then upload them to Linux via FTP for direct installation. Can avoid the virtual machine because of the network caused by some download problems.

  1. First, open the download directory on the official website, and then select the version based on your Linux version.

    • Address: https://www.rabbitmq.com/down…

  1. Since RabbitMQ is written in Erlang, you also need to provide the Erlang environment and then download Erlang.

    • Address: https://www.erlang-solutions…

      • A: Access to this site is extremely slow. Please be patient, or you may need to hang up A ladder
      • B: The Erlang version needs to match RabbitMQ (see figure, with a minimum and maximum version limit).

        • Version to check the address: https://www.rabbitmq.com/whic…
        • RabbitMQ 3.8.14 and Erlang 23.2.3 were selected

  1. /usr/local/bin/rabbitmq /rabbitmq /rabbitmq /usr/local/bin/rabbitmq /rabbitmq /rabbitmq

    • Now many Shell software have built-in FTP uploading, such as FinalShell, MobaXterm and so on
    • The uploaded file and directory locations are as follows
[root@centos7 rabbitmq]# ls esl-erlang_23.2.3-1_centos_7_amd64.rpm rabbitmq-server-3.8.14-1.el7.noarch.rpm [new] rabbitmq # ls esl-erlang_23.2.3-1_centos_7_amd64.rpm rabbitmq-server-3.8.14-1.el7.noarch.rpm [new  rabbitmq]# pwd /usr/local/bin/rabbitmq
  1. Install Erlang, Socat, and RabbitMQ

    • Erlang and Socat are both dependencies on RabbitMQ
Erlang_23.2.3-1_centos_7_amd64.rpm # install Socat # install Socat # install Socat # install Socat RabbitMQ RPM -ivh RabbitMQ -server-3.8.14-1.el7.noarch. RPM: RabbitMQ RPM -ivh RabbitMQ -server-3.8.14-1.el7.noarch. RPM
  1. After installation, start the server to see if RabbitMQ started successfully
# # startup services systemctl start the rabbitmq - server boot from rev systemctl enable the rabbitmq server - systemctl stop the rabbitmq server - # # stop service Systemctl status rabbitmq-server.service

As shown in the figure, the installation started successfully

If installation error, handling reference:

  • Linux RabbitMQ installation to handle various issues
  • RabbitMQ ERROR: EPMD ERROR for host deb: address (cannot connect to host/port

2.1.2 Configure Web interface management

The above installation is actually over, but RabbitMQ provides us with a Web-style admin interface, which is not available by default, so we need to install it.

  1. Install the Web management plug-in, and then restart the service
Rabbitmq_management: SystemCtl restart rabbitmq_server: systemCtl restart rabbitmq_server: systemCtl restart rabbitmq_server: systemCtl restart rabbitmq_server: systemCtl restart rabbitmq_server: systemCtl restart
  1. Be sure to open port 15672 on the Linux firewall, otherwise it will be inaccessible, and you can even query the command to turn it off during the learning phase

    • The corresponding server (Ali cloud, Tencent cloud, etc.) is in the security group open port 15672
    • Access Linux IP:15672, for examplehttp://192.168.122.1:15672
Select * from user where 15672 is open; Firewall-cmd --query-port=15672/ TCP # Firewall-cmd --add-port=15672/ TCP --permanent # Loading firewall -- port=15672/ TCP --permanent # Firewall-cmd --query-port=15672/ TCP Firewall-cmd --query-port=15672/ TCP Firewall-cmd --reload # Firewall-cmd --query-port=15672/ TCP
  1. Add a remote login account

    • RabbitMQ has a default account and password of ‘guest’ but can only be accessed at localhost
Rabbitmqctl add_user admin = admin rabbitmqctl add_user admin
  1. Add permissions to the account that is logged in remotely

    • Administrator: Log on to the console, view all information, action user, action policy
    • Monitoring: Log on to the console and view all the information
    • PolicyMaker: Log in to the console and specify the policy
    • Managment (general administrator) : Login to the console
Rabbitmqctl set_user_tags admin administrator rabbitmqctl set_user_tags admin administrator rabbitmqctl set_user_tags admin administrator
  1. Add resource permissions for the user

    • Since admin is already super administrator, it is ok not to assign resource permissions. This will be done by default.
The command format is: Rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # -p / admin ".*"".*"".*"
  1. IP access Linux: 15672, for example, http://192.168.122.1:15672, input user name password just set up a good admin

    • As shown in the figure: successful access

2.1.2.1 Summary of commands

  1. Add user:rabbitmqctl add_user <username> <password>
  2. Change password:rabbitmqctl change_password <username> <newpass>
  3. Delete user:rabbitmqctl delete_user <username>
  4. User list:rabbitmqctl list_users
  5. Set user roles:rabbitmqctl set_user_tags <username> <tag1,tag2>
  6. Delete all user roles:rabbitmqctl set_user_tags <username>
  7. Add resource permissions for the user:set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

Use: Type rabbitmqctl to prompt for possible commands, and then use rabbitmqctl hepl < command > to see how and what parameters are used for a specific command.

2.1.3 Brief introduction of Web interface management

  • Connections: This is used to manage the producers and consumers that are connected to RabbitMQ
  • Channels: When a connection is established, Channels are formed, and message delivery and retrieval depend on Channels.
  • Exchanges: The route of messages
  • Queues: Queues that hold messages waiting to be consumed and then removed from Queues.
  • Admin (administration) : It is used to set the administration user and the corresponding permissions, as shown in the figure below

Tags are used to specify the user’s role

  • Administrator: Log on to the console, view all information, action user, action policy
  • Monitoring: Log on to the console and view all the information
  • PolicyMaker: Log in to the console and specify the policy
  • Managment (general administrator) : Login to the console

2.2 Docker installation

It is very convenient to install RabbitMQ in Docker without having to consider various conflicts and incompatibility issues such as version, environment, etc. The virtual machine I demo is a CentOS 7.9 bare-hardware machine, so we start from the update of yum, Follow the steps to install Docker and RabbitMQ

2.2.1 configuration yum

  1. Update YUM to the latest version
Yum - yum- yum- yum- yum- yum- yum- yum- yum- yum The last two are yum install-y yum-utils device-mapper-persistent-data LVM2 used by DeviceMapper
  1. Set the YUM source to AliCloud
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

2.2.2 installation docker

2.2.2.1 steps

  1. Use yum to install Docker

    • Docker-CE means community version, EE means enterprise version
yum install docker-ce -y
  1. Check that the installation was successful by looking at the version
docker -v
  1. Docker image acceleration (where < your ID > should be changed to your own ha)
sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://< your ID>.mirror.aliyuncs.com"]} EOF sudo systemctl daemon-reload sudo systemctl restart docker
  • The image accelerator can be configured when domestic image pulling from DockerHub is sometimes difficult. Both the official Docker and many cloud service providers in China provide domestic accelerator services, for example:

    • Hkust mirror: https://docker.mirrors.ustc.e…
    • Netease: https://hub-mirror.c.163.com/
    • AliCloud: https://< your ID>.mirror.aliyuncs.com
    • Qiniu Cloud Accelerator: https://reg-mirror.qiniu.com

    After configuring an accelerator address, if you find that the image cannot be pulled, switch to another accelerator address. Major domestic cloud service providers all provide Docker image acceleration service. It is suggested to select the corresponding image acceleration service according to the cloud platform running Docker.

    Ali cloud image acquisition address: https://cr.console.aliyun.com… After logging in, select Mirror Accelerator on the left menu and you can see your exclusive address

2.2.2.2 Docker common commands

2.2.2.2.1 Management commands

  • Simple commands such as start, stop, and restart can also be used using services, while systemctl is slightly more powerful
Docker systemctl docker system restart docker systemctl docker system restart docker systemctl docker system restart docker system restart docker system restart docker system restart docker system restart docker system restart docker system restart docker system restart docker system restart docker system restart docker Systemctl enable docker systemctl unenable docker systemctl enable docker systemctl unenable docker

2.2.2.2.2 Mirror command

< xx.tar.gz > < docker image > < docker image > < docker image > < docker image > < docker image

2.2.3 Install RabbitMQ (choose any one)

Note: it is better to use 2.2.3.2 sentence to install directly

2.2.3.1 Step-by-step installation

  1. Gets the image of RabbitMQ
docker pull rabbitmq:management
  1. Create and run the container (the parameters are described in Section 3)
Docker run-id --name -p 15672:15672-p 5672:5672 rabbitMQ :management

2.2.3.2 One-sentence installation

The RabbitMQ image is installed on the RabbitMQ server, and the RabbitMQ image is installed on the RabbitMQ server. The RabbitMQ image is installed on the RabbitMQ server. The RabbitMQ image is installed on the RabbitMQ server, and the RabbitMQ image is installed on the RabbitMQ server. So you still need to go in and configure it yourself as before, and the Docker Hub has already shown an example of our configuration, using -e for configuration and RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS for user name and password

For more information, please see the Setting Default User and Password section in the Docker Hub Official Giving example

https://registry.hub.docker.c…

  1. Perform the installation
docker run -di --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
  1. Check the container state to see if it ran successfully
Docker stop docker stop docker stop docker stop docker stop docker stop docker stop docker stop docker Do not stop exit # to enter the node container (if -t is enabled) docker exec-it container name bash

2.2.3.2.1 Parameter introduction

These parameters are explained as follows:

  • -i: Represents the run container.
  • -t: Represents a way to reserve interaction for the container (command line), that is, to assign a pseudo-terminal. So you see it all the time-itThis combination.
  • --name: Give the container a name.
  • -v: Represents a directory mapping relationship (the former is a host directory, the latter is a directory mapped to the host), which can be used in multiple ways-vDo multiple directory or file mappings. Note: It is recommended to do directory mapping, make changes on the host, and then share them on the container.
  • -d: means to create a daemon container to run in the background (so that the creation of the container will not automatically log into the container, if only added-i -tTwo parameters, which automatically enter the container when created, that is, the backend is suspended.
  • -p: Represents a port map, where the former is the host port and the latter is the mapped port within the container. You can use multiple-pDo multiple port mapping, only do port mapping, can be accessed outside.

To give you an example:

Mysql > create a container and map port 3000 to port 3000; Docker run-d-it-p 3000:3000-v /demo:/demo --name node face # The image named node has a Python program that needs to be executed. You can execute the program docker exec-it node bash by going to the command line you just assigned
  • Because of the use-tThis parameter can therefore be assigned to a pseudo terminal byDocker Exec-it container name bashAccess to command line
  • -vAfter the directory is mapped and you enter the container, you will also have an identical demo folder where you can execute Python programs, for example

2.2.3.2.1 Port introduction

4369: Erlang found port

5672: Client side communication port

15672: admin interface UI port

25672: Internal communication port between servers

61613: STOMP client with and without TLS

1883: MQTT client with TLS not enabled and enabled

The key ones are 5672 and 15672

More details on the port can be found in the documentation on the website

  • https://www.rabbitmq.com/netw…

Note: If you want to connect remotely, for example, to access port 15672 of the Web management page, port 5672 of the Java client connection, you must carry out an open operation, otherwise they can not connect.

  • The following is an example of opening port 15672 based on CentOS 7.9
Select * from user where 15672 is open; Firewall-cmd --query-port=15672/ TCP # Firewall-cmd --add-port=15672/ TCP --permanent # Loading firewall -- port=15672/ TCP --permanent # Firewall-cmd --query-port=15672/ TCP Firewall-cmd --query-port=15672/ TCP Firewall-cmd --reload # Firewall-cmd --query-port=15672/ TCP
  • The following is the command to turn off the firewall
systemctl disable firewalld
systemctl stop firewalld   

3. RabbitMQ protocol and model

Once the installation is complete, it’s time to move on to the topic of several ways to implement RabbitMQ in Java or SpringBoot code, but a good understanding of the routing and switching methods requires some knowledge of the protocol and architectural model.

3.1 protocol

3.1.1 What is an agreement?

Protocol, short for network protocol, is a set of conventions that must be followed by both sides of a communication computer. Such as how to establish a connection, how to identify each other. Only by following this convention can computers communicate with each other. Its three elements are: grammar, semantics and timing.

In order for data to travel from source to destination on the network, participants in network communication must follow the same rules, which is called a protocol, which is ultimately embodied in the format of data packets transmitted over the network.

3.1.1.1 Three elements of network protocol

  1. Grammar: The structure and format of data and control information, and the order in which data appears.
  2. Semantics: Explains the meaning of each part of the control message, and specifies what control message needs to be sent and what response to the completed action should be.
  3. Timing: A detailed description of the order in which events occur.

These three elements are vividly described as what to do, how to do it, and the order in which to do it.

An example is the HTTP protocol

Syntax: HTTP specifies the format of the request and response messages


Semantic: the client initiates the request called the request, the server then returns the data, called the response


Timing: One request corresponds to one response, and the request precedes the response

3.1.1.1.1 Interview Question: Why does messaging middleware not directly use HTTP protocol

For a message middleware, its main responsibility is responsible for data transfer, storage, distribution, high performance and simple is what we are after, the HTTP request message and reply message header is more complex, containing the Cookie, data encryption, windowsill, additional functions such as the response code, we don’t need so complicated function.

At the same time, HTTP is mostly short links in most cases. In the actual interaction process, a request to response is likely to be interrupted. After the interruption, persistence will not be performed, which will cause the loss of the request. This is not conducive to the business scenario of message-oriented middleware, because the message-oriented middleware may be a long-term process of acquiring information. When problems and failures occur, data or messages need to be persisted, etc., in order to ensure the highly reliable and robust operation of messages and data

3.1.2 AMQP protocol for RabbitMQ

The protocol used by RabbitMQ is AMQP (Advanced Message Queuing Protocol), which was first proposed in 2003 to solve the problem of messaging interaction between different platforms in the financial world.

AMQP is more accurately a Binary Wire-Level Protocol. This is a fundamental difference from JMS, where AMQP does not qualify from the API layer, but rather directly defines the data format that the network interchanges. This makes the Provider (Producer) that implements AMQP naturally cross-platform.

Compared with other messaging protocols, it has the following characteristics:

  1. Distributed transaction support
  2. Persistence support for messages
  3. High performance and highly reliable message processing advantages

3.1.3 Architectural model

In order to learn the following message delivery patterns, the model diagram must be understood clearly, because these methods represent different degrees of selection and reduction of the model

  • Producer: The producer of the message (the program that sends the message).
  • Connection: Network connection between the application and the Broker.
  • Channels: Channels, the channels through which information is transmitted, can be created with multiple Channels, each representing a session task.

    • A channel is a virtual connection built within a TCP connection, through which messages are read and written. Since it is very expensive for the operating system to set up and destroy TCP, the concept of a channel is introduced to reuse a TCP connection.
  • Broker(Server): Identifies the message queue Server entity, such as RabbitMQ Server in this case.
  • Virtual hosts: Virtual hosts. Multiple Virtual hosts can be set up in a Broker to isolate privileges for different users.

    • The Virtual Host is the feel of each database. Different projects can be assigned to different databases, including the business tables to which the project belongs, and so on.
    • In each Virtual Host, you can have several exchanges and queues.
  • Exchange: Switch that receives messages sent by the producer and sends them to the queue according to the routing key.
  • BindingA virtual link between an Exchange and a Queue. A Binding can include multiple Routing keys.
  • Routing keyRouting rules, which the virtual machine uses to confirm how to route a particular message.
  • Queue: Message queue, it is a container of messages, used to store messages, each message can be passed into one or more queues, waiting for consumers to consume, that is, take out the message.
  • Consumer: the consumer of the message (the program that receives the message).

4. Java implements RabbitMQ

4.1 Environment Construction

Website to introduce several kinds of models: https://www.rabbitmq.com/gets…

So far, there are 7 models on the website. We will focus on the first 5 basic models. Some people also refer to both Direct and Topic models as Routing, which can be regarded as four major models.

4.1.1 Create a Java project

Start by creating a Maven project that does not use a skeleton, and then introduce RabbitMQ dependencies, as well as unit test dependencies

< the dependency > < groupId > com. The rabbitmq < / groupId > < artifactId > closer - client < / artifactId > < version > 5.10.0 < / version > </dependency> <dependency> <groupId> jUnit </groupId> </artifactId> jUnit </artifactId> <version>4.11</version> </dependency>

4.1.2 Create virtual host (optional)

In this case, we create a new Virtual Hosts to serve the Java project. You can also create a new user and enable access to the Virtual Hosts (i.e. the Virtual host is bound to the user). Let’s use admin (an administrator privileges user I created earlier) for the demonstration.

Note: You can do this without using the/and admin user

4.1.3 Create Connection Utility Class

Because we’re going to show you a lot of examples, and we’re going to show you the same code every time we get a connection, and every time we release a connection, and every time we close a resource, we’re going to extract a tool class to prevent code redundancy, optimize the code, and make it easier to understand, so you can focus on the comparison between the different implementations.

  • RabbitMqUtil tools
Public class RabbitMQutil {/** ** / private static String host = ""; Private static int port = 0; private static int port = 0; private static int port = 0; Private static String VirtualHost = ""; private static String VirtualHost = ""; Private static String username = ""; Private static String Password = ""; Static {try {Properties Properties = new Properties(); / / get the properties file stream object InputStream = RabbitMqUtil. In the class. The getClassLoader () getResourceAsStream (" the rabbitmq. Properties "); properties.load(in); // return value host = property.getProperty ("host"); port = Integer.parseInt(properties.getProperty("port")); virtualHost = properties.getProperty("virtualHost"); username = properties.getProperty("username"); password = properties.getProperty("password"); } catch (Exception e) { e.printStackTrace(); Public static Connection getConnection() {try {// create a Connection factory connectionFactory connectionFactory = new ConnectionFactory(); / / set to connect the rabbitmq host connectionFactory. SetHost (host); / / set the port number the connectionFactory. SetPort (port). / / set up the connection of the virtual host (feel) database connectionFactory. SetVirtualHost (virtualHost); / / set access virtual host user name and password connectionFactory. SetUsername (username); connectionFactory.setPassword(password); / / returns a new connection return connectionFactory. NewConnection (); } catch (Exception e) { e.printStackTrace(); } return null; Public static void close(@param channel @param connection *) public static void close(@param channel @param connection *) public static void close(@param channel *)  Connection connection) { try { if (channel ! = null) { channel.close(); } if (connection ! = null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); }}}
  • properties
VirtualHost =/rabbitmq_maven_01 username=admin password=admin

4.2 Five implementation modes

Description:

  • Queue names, messages, and other string contents are better defined as variables passed in. In my article, I write them directly in the parameters. This magic value is not very elegant.
  • The producer uses JUnit unit tests, but the consumer is written in the main function, because we want the consumer to be in a continuous run-and-wait state, and using JUnit will cause the program to die after only one execution.

    • Instead of writing in the main function, consider using sleep wait or while(true) to keep the program from terminating directly.

4.2.1 Simple Queue Pattern (Hello Word)

  • Producer: The producer of the message (the program that sends the message).
  • Queue: Message queue, understood as a container to which a producer sends messages and which stores messages for consumers to consume.
  • Consumer: the consumer of the message (the program that receives the message).

4.2.1.1 How to understand

As you can see from the figure, the simple queue pattern, where a producer passes through a queue, corresponds to a consumer. It can be regarded as a point-to-point transmission mode. Compared with the model diagram in 3.1.3, the main feature is that Exchange and routekey can not be seen. Because this mode is simple, it does not involve complicated conditional distribution and so on. There is no need for the user to explicitly consider switches and routing keys.

  • Note, however, that instead of the producer directly docking to the queue, the default switch is used. The default switch will send the message to the queue with the same name as the RouteKey, which is why we fill in the queue name at the RouteKey location later in the code

4.2.1.2 Code implementation

4.2.1.2.1 Producer code

public class Producer { @Test public void sendMessage() throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Channel = connection.createChannel(); Channel. queueClare ("queue1",false,false,false,null); Channel. basicPublish("","queue1",null,"This is rabbitMQ message 001!") .getBytes()); RabbitMqUtil. Close (channel,connection); RabbitMqUtil. }}
  1. Get the connection from the utility class
  2. Access to connection channels: according to the model figure in 3.1.3, producers need to obtain channels after access to connections before they can access subsequent switch queues, etc.
  3. Channel binding message queue: Before binding the queue, the switch should be bound. However, the concept of switch is hidden in this mode, and the default switch is used behind it, so the queue is directly bound.

    • QueueClare method interpretation

      • Parameter 1: QUEUE (queue name), automatically created if the queue does not exist.
      • Parameter 2: Durable (queues). Durable ensures that the queues will be durable even after server restart.
      • Parameter 3: exclusive queue is whether or not the queue is exclusive. If true, the queue is visible only for the connection where it was first declared and is automatically removed when the connection is broken.
      • Parameter 4: AUTODELETE. The queue will be automatically deleted after the last consumer finishes consuming the message.
      • Parameter 5: arguments (carries additional attributes).
  4. Publish message: Here you can specify how to send the message queue, the content, etc. Because this pattern is relatively simple, it does not cover all the parameters, which will be explained in detail in the following pattern

    • BasicPublish method interpretation

      • Parameter 1: exchange (exchange name).
      • Parameter 2: routingKey, where the queue name is filled, is used to send messages to a queue with the same name as the routekey.
      • Parameter 3: props (control state of the message), where you can control the persistence of the message.

        • Parameters as follows: MessageProperties PERSISTENT_TEXT_PLAIN
      • Parameter 4: body (message body), the type is a byte array, to change the type.
  5. Close a channel and release a connection using a tool: Close the channel first, then release the connection.

4.2.1.2.2 Consumer code

public class Consumer { public static void main(String[] args) throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Channel = connection.createChannel(); Channel. queueClare ("queue1", false, false, false, null); // consume message channel.basicConsume("queue1", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("new String(body): " + new String(body)); }}); }}
  1. Get the connection from the utility class
  2. Get the connection channel
  3. Channels bind message queues
  4. Consuming messages: This is used to specify which queue messages to consume, as well as some mechanisms and callbacks

    • Explanation of BasicConsume method

      • Parameter 1: Queue (queue name), that is, the message on which queue to consume.
      • Parameter 2: AUTOACK starts the automatic acknowledgement mechanism of the message, deleting the message from the queue as soon as it is consumed.
      • The type of callback is Consumer. DefaultConsumer is used in this case, which is an implementation class of Consumer. Here, we mainly use the body, that is, to view the message body. The other three parameters have not been used yet. If you are interested, you can print them out first, so that you can have a general understanding.

4.2.2 Work Queue

  • Producer: The producer of the message (the program that sends the message).
  • Queue: Message queue, understood as a container to which a producer sends messages and which stores messages for consumers to consume.
  • Consumer: The Consumer of the message (the program that receives the message).

    • Here, we assume that Consumer1, Consumer2 and Consumer3 are consumers who complete tasks at different speeds respectively, which leads to a key problem of this model.

4.2.2.1 How to understand

As can be seen from the figure, the working mode is to add multiple consumers on the basis of the simple queue mode, that is, to make multiple consumers bind to the same queue and consume together. In this way, the phenomenon of message accumulation caused by the production speed being much faster than the consumption speed in the simple queue mode can be solved.

  • Because the message disappears after consumption, you don’t have to worry about repeating the task.

4.2.2.2 Code implementation

Note: There are two work queue modes

  1. Polling pattern: Each consumer shares the message equally
  2. Fair distribution model (more work for the good ones) : distribution according to ability, more distribution for the fast ones and less distribution for the slow ones

We first demonstrated the polling pattern, which, depending on its disadvantages, leads to the fair distribution pattern

The following only describes the differences. In the simple mode, these basic methods are all described

4.2.2.2.1 Polling pattern – producer code

public class Producer { @Test public void sendMessage() throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Channel = connection.createChannel(); Channel. queueClare ("work", true, false, false, null); for (int i = 1; i <= 20; I++) {/ / news channel. BasicPublish (" ", "work", null, (I + ", "). The getBytes ()); } // Use the tool to close the channel and release the connection RabbitMqutil. Close (channel, connection); }}

The process is basically the same as the simple queue mode, with a few minor changes. In the producer, the main thing is to add layer loop. Since there are multiple consumers, more messages are sent, so we can see some features and problems.

4.2.2.2.2 Polling pattern – consumer code

  • Consumer 1
Public class Consumer1 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Final Channel Channel = connection.createChannel(); Channel. queueClare ("work", true, false, false, null); // consume news channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" Consumer # 1: Consumption -" + new String(body)); }}); }}
  • Consumer 2
Public class Consumer2 {public static void main(String[] args) throws IOException {// Consumer2 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Final Channel Channel = connection.createChannel(); Channel. queueClare ("work", true, false, false, null); // consume news channel.basicConsume("work", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer No. 2: Consumer -" + new String(Body)); }}); }

Both consumers turn on automatic ACK responses in BasicConsume, which will be detailed below, while in Consumer 1, a statement of SLEEP 2S is added to simulate the scenario where Consumer 1 processes messages slowly while Consumer 2 processes messages fast.

Running results:

  • Consumer1
Consumer1: Consumer-1 message Consumer1: Consumer-3 message Consumer1: Consumer-5 message Consumer1: Consumer-7 message Consumer1: Consumer-9 message Consumer1: Consumer-11 message Consumer1: Consumer-13 message Consumer1: Consumer-message No.15 Consumer-1: Consumer-message No.17 Consumer-1: Consumer-message No.19
  • Consumer2
Consumer2: Consumer-2 message Consumer2: Consumer-4 message Consumer2: Consumer-6 message Consumer2: Consumer-8 message Consumer2: Consumer-10 message Consumer2: Consumer-12 message Consumer2: Consumer-14 message Consumer2: Consumer-message No.16 Consumer-2: Consumer-message No.18 Consumer-2: Consumer-message No.20

Observe the execution process: Found two consumers while everyone respective dealt with half of the last message, and it is a distribution according to one person, but consumers processing speed is fast, 2 immediately after all done, but consumer 1, every process need 2 s, so can only slow processing, and consumers. 2 is in the midst of an idle waste.

How do I switch to fair distribution?

This is related to the second parameter in BasicConsume, which turns on automatic confirmation consumption. This defaults to true, which means that as soon as I get the message distributed to this consumer in the queue, I will automatically return a confirmation consumption token and the queue will delete the message when it is received.

  • However, there is a very important problem in this approach, which is to give the risk to the consumer. For example, the consumer receives 10 messages that he needs to deal with, and just consumes 4 messages, and the consumer crashes and dies, and the next 6 messages are lost.

If you want to change to a capacity based allocation, there are two key points

  1. Set the channel to consume only one message at a time
  2. Turn off automatic acknowledgement of messages and manually confirm messages

4.2.2.2.3 Fair Distribution Mode – Producer Code

public class Producer { @Test public void sendMessage() throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Channel = connection.createChannel(); // Send only one message at a time Channel.basicQos (1); Channel. queueClare ("work", true, false, false, null); for (int i = 1; i <= 20; I++) {/ / news channel. BasicPublish (" ", "work", null, (I + ", "). The getBytes ()); } // Use the tool to close the channel and release the connection RabbitMqutil. Close (channel, connection); }

4.2.2.2.4 Fair Distribution Mode – Consumer Code

  • Consumer 1
Public class Consumer1 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Final Channel Channel = connection.createChannel(); // Accept only one unconfirmed message at a time Channel.basicQos (1); Channel. queueClare ("work", true, false, false, null); // consume message channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" Consumer # 1: Consumption -" + new String(body)); // return DeliveryTag = Queue can delete this message Channel.Basicack (Envelope.GetDeliveryTag (), false); }}); }}
  • Consumer 2
Public class Consumer2 {public static void main(String[] args) throws IOException {// Consumer2 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Final Channel Channel = connection.createChannel(); Channel.basicQos (1); // accept only one unconfirmed message at a time. Channel. queueClare ("work", true, false, false, null); // consume message channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer No. 2: Consumer -" + new String(Body)); channel.basicAck(envelope.getDeliveryTag(), false); }}); }

Running results:

  • Consumer1
Consumer1: Consumer-1 message
  • Consumer2
Consumers' - 2, no. 2: message consumer: 2-3 news consumers: 2-4 news consumers: 2-5 news consumers: 2-6 news consumers no. 2: message consumer consumption - 7:2-8 news consumers: 2-9 news consumers no. 2: Consumer-message 10 Consumer-2: Consumer-message 11 Consumer-2: Consumer-message 12 Consumer-2: Consumer-message 13 Consumer-2: Consumer-message 14 Consumer-2: Consumer-message 15 Consumer-2: Consumer-message 16 Consumer-2: Consumer-message 17 Consumer-2: Consumer-message 17 Consumer-18 message Consumer-2: Consumer-19 message Consumer-2: Consumer-20 message

4.2.3 Publish and Subscribe Model (Fanout Broadcast)

  • Producer: The producer of the message (the program that sends the message).
  • Exchange: Switch that sends messages to a specified queue.
  • Queue: Message queue, understood as a container to which a producer sends messages and which stores messages for consumers to consume.
  • Consumer: the consumer of the message (the program that receives the message).

4.2.3.1 How to understand

Fanout literal translation for “fan out” but everyone more will call it a broadcast or publish and subscribe, it is a kind of no routing key mode, the producer sends a message to exchange, exchange opportunities to copy all messages to all with its binding on a queue, and each queue can have only one consumer get the message, If more than one channel is created in a consumer connection, the result is a scramble for messages.

4.2.3.2 Code implementation

Note: The following only describes the differences from the above. In the simple model, these basic methods are described

4.2.3.2.1 Producer code

public class Producer { @Test public void sendMessage() throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Final Channel Channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order", "fanout"); for (int i = 1; i <= 20; I++) {// Publish message Channel.BasicPublish ("order", "null ", "fanout!") {// Publish message Channel.BasicPublish ("order"," null ", "fanout!"); .getBytes()); } // Use the tool to close the channel and release the connection RabbitMqutil. Close (channel, connection); }}
  1. Declaration switch

    • ExchangeCare method interpretation

      • Parameter 1: exchange (switch name), automatically created if the switch does not exist
      • Parameter 2: type (type), here select Fanout mode
  2. Publish message: Enter the name of the switch defined above in the first argument of the BasicPublish method, and in the second argument, the route key is empty

    • The 20 loop is designed to demonstrate the consumer

4.2.3.2.2 Consumer code

  • Consumer 1
Public class Consumer1 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order", "fanout"); String Queue = Channel.QueueClare ().getQueue(); Channel.QueueBind (queue, "order", ""); // consume message channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer No. 1: User -" + new String(Body)); }}); }}
  1. Declaration switch
  2. Create a temporary queue
  3. Bind temporary queues and switches

    • QueueBind method interpretation

      • Parameter 1: Queue (temporary queue)
      • Parameter 2: Exchange (exchange)
      • Parameter 3: routingKey (routingKey)
  • Consumer2: Demonstrates multiple channels in a connection
Public class Consumer2 {public static void main(String[] args) throws IOException {// Consumer2 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Channel = connection.createChannel(); Channel channel2 = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order", "fanout"); channel2.exchangeDeclare("order", "fanout"); String Queue = Channel.QueueClare ().getQueue(); System.out.println(queue); Channel.QueueBind (queue, "order", ""); channel2.queueBind(queue, "order", ""); // consume message channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer No. 2: Consumer -" + new String(Body)); }}); BasicConsume (queue, true, true) new DefaultConsumer(channel2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties properties, byte[] body) throws IOException {System.out.println(" Customer 2-2: Consume -" + new String(body)); }}); }}

Running results:

Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout! Consumer2-2: Consumer-Fanout!

4.2.3.2.3 Why are switches also declared in consumers?

As can be seen from the above code, switches are declared respectively in Producer and Conusmer, but consumers, as can be seen from the figure, do not have direct contact with switches. Why do consumers also declare switches?

This is to ensure that when the Producer or Producer is executing, there will never be any mistakes caused by the fact that the switch has not been declared. For example, if you only declare the switch in the Producer, then you must start the Producer first. If you execute Conusmer directly, the switch does not yet exist, and an error will be reported. Writing the declaration all ensures that whoever is started first will be declared to the switch.

4.2.4 Routing/Direct

  • Producer: The producer of the message (the program that sends the message).
  • Exchange: Switch that sends messages to a specified queue.
  • routingKey: Routing keys, such as key1, key2, etc., add another layer of restriction between the switch and the queue
  • Queue: Message queue, understood as a container to which a producer sends messages and which stores messages for consumers to consume.
  • Consumer: the consumer of the message (the program that receives the message).

4.2.4.1 How to understand

The switch type for routing mode is Direct, with the addition of the concept of routing key compared to Fanout mode. The producer sends a message with a specified routingKey to the switch. The switch takes the routingKey to the queue that is bound to the key, and sends it to the queue. A queue can bind multiple routingkeys.

4.2.4.2 Code implementation

4.2.4.2.1 Producer code

public class Producer { @Test public void sendMessage() throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order_direct", "direct"); // Specify the routingKey String key = "info"; Channel.BasicPublish ("order_direct", key, null, (" message sent to specified route "+ key + ").getBytes()); RabbitMqUtil. Close (channel, connection); RabbitMqUtil. }}
  1. Specify the routingKey, that is, in the second argument of the BasicPublish method, the value of the key

4.2.4.2.2 Consumer code

  • Consumer 1
Public class Consumer1 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order_direct", "direct"); String Queue = Channel.QueueClare ().getQueue(); Channel.QueueBind (queue, "order_direct", "info"); channel.queueBind(queue, "order_direct", "error"); channel.queueBind(queue, "order_direct", "warn"); // consume message channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer 1: Consumer -" + new String(Body)); }}); }}
  1. It’s just that the key value is added when binding the queue to the switch
  • Consumer 2
Public class Consumer2 {public static void main(String[] args) throws IOException {// Consumer2 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order_direct", "direct"); String Queue = Channel.QueueClare ().getQueue(); Channel.QueueBind (queue, "order_direct", "error"); // consume message channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer 2: Consumer -" + new String(Body)); }}); }}

Run result: Only consumer 1 received the message

Consumer 1: Consumption - Message sent to the specified route INFO

4.2.5 Wildcard matching pattern (Topic)

  • Producer: The producer of the message (the program that sends the message).
  • Exchange: Switch that sends messages to a specified queue.
  • RoutingKey: Routing keys (key1, key2, etc.) add another layer of constraint between the switch and the queue

    • However, the key in Topic is in the form of wildcard, which can greatly improve the efficiency
  • Queue: Message queue, understood as a container to which a producer sends messages and which stores messages for consumers to consume.
  • Consumer: the consumer of the message (the program that receives the message).

4.2.5.1 How to understand

The wildcard matching pattern’s exchange type is topic, and because it is similar to the Direct pattern, the Direct pattern and topic are sometimes grouped together under the routing pattern. The difference is that the Direct pattern’s routingKey is a specified value. On the other hand, a Topic pattern routingKey can use wildcards and is usually composed of one or more words, separated by “.” Split, for example ideal.insert.

  • *: matches exactly one word, for example:order.*You can match to ORDER.INSERT
  • # : Matches one or more words, for example: order.# matches to order.insert.mon

    • #It’s like a multi-layered concept, and*It’s just a single layer concept

4.2.5.2 Code implementation

4.2.5.2.1 Producer code

public class Producer { @Test public void sendMessage() throws IOException, TimeoutException {/ / by tools for link Connection Connection. = RabbitMqUtil getConnection (); Channel = connection.createChannel(); channel.exchangeDeclare("order_topic", "topic"); // String key = "user.query.all"; Channel.BasicPublish ("order_topic", key, null, (" message sent to specified route "+ key + ").getBytes()); RabbitMqUtil.close(channel, connection); }}

4.2.5.2.2 Consumer Code

  • Consumer 1
Public class Consumer1 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order_topic", "topic"); String Queue = Channel.QueueClare ().getQueue(); String key = "user.*"; channel.queueBind(queue, "order_topic", key); Channel.basicConsume (queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer 1: Consumer -" + new String(Body)); }}); }}
  • Consumer 2
Public class Consumer2 {public static void main(String[] args) throws IOException {// Consumer2 {public static void main(String[] args) throws IOException = RabbitMqUtil.getConnection(); Channel = connection.createChannel(); // declare the exchange Channel. exchangeClare ("order_topic", "topic"); String Queue = Channel.QueueClare ().getQueue(); String key = "user.#"; channel.queueBind(queue, "order_topic", key); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, Amqp.basicProperties, Byte [] Body) throws IOException {System.out.println(" Customer 2: Consumer -" + new String(Body)); }}); }}

Only consumer 2 received the message, because the message is a multi-tier structure, only user.# can match

Consumer 2: Consumption - message sent to the specified route user.query.all

5. Spring Boot implements RabbitMQ

SpringBoot provides the Spring For RabbitMQ launcher, as well as a series of annotations and RabbitTemplates that we can use to greatly simplify the RabbitMQ development process. The following shows how [5.1 Pure Annotations Based] and [5.2 Annotations Based + Configuration Class] are written. They are used in much the same way, except for the different places of declaration and binding queue switches. It is generally believed that the latter is better for maintenance and management, so choose one of them.

Environmental preparation:

  1. Start by creating the SprinBoot project, and then select the RabbitMQ launcher, as well as basic launchers such as unit tests
  2. Write the YML configuration file and write the data needed to connect to RabbitMQ

The RabbitMQ rely on

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

YML configuration file

Spring: RabbitMq: Host: 192.168.122.1 # Host: 192.168.122.1 # Host: 192.168.122.1 # Port: 5672 # TCP: admin Virtual host: /rabbitmq_springboot_01 #

5.1 Based on pure annotations

Note: This approach does not create configuration classes to manage queue and switch declarations and bindings, etc. Instead, it is all written directly in the consumer via annotations

5.1.1 Simple Queue Pattern

All the code for the production message is put into Test

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** */ @Autowired private RabbitTemplate; @Test public void testSimpleSendMessage() { rabbitTemplate.convertAndSend("simple_queue", "This is a message !" ); }}
  1. The first step is to inject the rabbitTemplate that SpringBoot provides to us
  2. The RabbitTemplate’s convertAndSend method is used to send messages. It has several overloads, which today use two and three parameters, respectively

    • ConvertAndSend (two arguments)

      • Parameter 1: routingKey (routingKey)
      • Parameter 2: Object (sent message body)
    • ConvertAndSend (three parameters)

      • Parameter 1: Exchange (exchange)
      • Parameter 2: routingKey (routingKey)
      • Parameter 3: Object (sent message body)
  • consumers
RabbitListener(QueueStodeclare = @queue (value = "simple_queue", durable = "true", durable = "durable ", durable =" durable ", durable = "durable ", durable =" durable ") exclusive = "false", @RabbithHandler public void receiveMessage(String message) { System.out.println(" consumer: "+ message); }}
  1. Into the container
  2. Listen on RabbitMQ, in the @RabbitListener annotation, you can implement the queue declaration, and then the switch and the queue binding, etc

    • @Queue can have four parameters, and since each has a default value, a given value will be created by default as persistent, non-exclusive, non-auto-deleted

      • Parameter 1: value (queue name)
      • Durable RabbitMQ restart, the queue still exists, the default is true
      • Parameter 3: exclusive indicates whether the message queue is only valid for the current Connection, which is false by default
      • Parameter 4: Auto-delete means that the message queue will be deleted automatically if it is not in use. Default is false
  3. By adding the @RabbithHandler annotation to the method, we can implement an automatic callback so that we can get the message from the producer

    • Note: The parameter type of the receiveMessage method depends on what type of data you sent to the producer

5.1.2 Work queue pattern

5.1.2.1 Polling mode

  • Producer: There is nothing to say, because the working mode has multiple consumers, so how many more messages to send
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * @Autowired @Test public void TestWorksEndMessage () {for (int I = 0; i < 20; i++) { rabbitTemplate.convertAndSend("work_queue", "This is a message ! , serial number: "+ I); }}}
  • consumers
@Component public class WorkConsumer {// Listens RabbitMq@RabbitListener (QueueStoDeclare = @Queue(" Work_Queue ")) // Consumer 1 Public void receiveMessage1(String message) {System.out.println(" consumer 1: "+ message); // Listener on RabbitMq@RabbitListener (QueueStoDeclare = @Queue(" work_Queue ")) // Consumer 2 public void receiveMessage2(String Message) {System.out.println(" consumer 2: "+ message); }}
  1. The @RabbitListener annotation can be placed either on a class or on a method. For example, in the code above, we have placed two methods on each method to refer to different consumers.

    • However, if you add the @RabbitListener annotation to the class, as in the following two methods, adding the @RabbitHandler annotation will return an error, and you need to create a class for each consumer

5.1.2.2 Fair mode (distribution according to ability)

5.1.2.2.1 How to modify configuration files

  • Invariant producer
  • Modify the configuration file yml/properties
Spring: RabbitMq: Host: 192.168.122.1 # Host: 192.168.122.1 # Host: 192.168.122.1 # Port: 5672 # TCP: admin /rabbitmq_springboot_01 /rabbitmq_springboot_01 / listener: simple: acknowledge-mode: Manual # Enables ACK manual reply PREFETCH: 1 # Only 1 message can be consumed at a time
  1. Acknowledge-mode option introduced

    • Auto: Auto validation is the default option
    • Manual: Manual Verification (Set to Manual Verification according to Capacity Assignment)
    • None: Not confirmed, discarded automatically after sending
  • consumers
@Component public class WorkConsumer {// Listens RabbitMq@RabbitListener (QueueStoDeclare = @Queue(" Work_Queue ")) // Consumer 1 public void receiveMessage(String body, Message message, Throws IOException {try {System.out.println(" Customer 1: "+ body); / / return deliveryTag for queue can delete this news channel. BasicAck (message) getMessageProperties () getDeliveryTag (), false); } catch (IOException e) { e.printStackTrace(); / / consumers tell queue information consumption channel failure. The basicNack (message. GetMessageProperties () getDeliveryTag (), false, true); }} // Listens RabbitMq@RabbitListener (QueueStoDeclare = @Queue(" work_Queue ")) // Consumer 2 public void receiveMessage2(String Throws IOException{try {// delay 2S is slow Thread. Sleep (2000); } catch (InterruptedException e) { e.printStackTrace(); } try {// Print the message subject System.out.println(" consumer 2: "+ body); / / return deliveryTag for queue can delete this news channel. BasicAck (message) getMessageProperties () getDeliveryTag (), false); } catch (IOException e) { e.printStackTrace(); / / consumers tell queue information consumption channel failure. The basicNack (message. GetMessageProperties () getDeliveryTag (), false, true); }}}
  1. Because manual validation is enabled in the YML configuration, a confirmation message needs to be returned after success and failure
  2. Basicack method explanation

    • Parameter 1: DeliveryTag (index of the message). When the message is returned, the queue can delete the message
    • Parameter 2: mutiple (whether to batch) Selecting true will reject all messages smaller than deliveryTag at once
  3. BasicNack method explanation

    • The above parameters 1 | 2
    • Parameter 3: requeue (whether to re-queue rejected)

Running results:

Consumer 1: This is a message! , serial number: 2 Customer 1: This is a message! , serial number: 3 Customer 1: This is a message! , serial number: 4 Customer 1: This is a message! , serial number: 5 Customer 1: This is a message! , serial number: 6 Customer 1: This is a message! , serial number: 7 Customer 1: This is a message! , serial number: 8 Customer 1: This is a message! , serial number: 9 Customer 1: This is a message! , serial number: 10 Customer 1: This is a message! , serial number: 11 Customer 1: This is a message! , serial number: 12 Customer 1: This is a message! , serial number: 13 Customer 1: This is a message! , serial number: 14 Customer 1: This is a message! , serial number: 15 Customer 1: This is a message! , serial number: 16 Customer 1: This is a message! , serial number 17 Customer 1: This is a message! , serial number: 18 Customer 1: This is a message! , serial number: 19 Customer 1: This is a message! , serial number: 20 Customer 2: This is a message! , serial number: 1

Up to now, we have realized the way to modify the configuration file to achieve allocation according to ability, and add a few configuration contents, we only used part of the above, others for your reference, YML and Properties you can choose

# send confirmation spring. The rabbitmq. Publisher - confirm - type = # correlated spring. The rabbitmq. Publisher - confirms = true (old) # send callback Spring. The rabbitmq. Publisher - returns = true # consumption manual confirmation spring. The rabbitmq. Listener. Direct. Acknowledge - mode = manual Spring. The rabbitmq. Listener. Simple. Acknowledge - mode = manual initialization value spring. # concurrent consumers the rabbitmq. Listener. Simple. The concurrency = 1 # The maximum concurrent consumers spring. The rabbitmq. Listener. Simple. The Max - concurrency = 10 # each consumer in dealing with the number of messages every time listening to korah # number of messages in a single request processing He should be greater than or equal to number of transactions (the maximum quantity unack) spring. The rabbitmq. Listener. Simple. The prefetch = 1 # support retry spring.rabbitmq.listener.simple.retry.enabled=true

5.1.2.2.1 How to configure the factory

/** * Set up consumer confirmation mechanism, * * @Param ConnectionFactory connectionFactory * @Return */ @Bean(" WorklistenerFactory ") public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(connectionFactory); / / modify for MANUAL confirmation containerFactory. SetAcknowledgeMode (AcknowledgeMode. MANUAL); / / rejection policies, true false reject back to queue, the default is true containerFactory. SetDefaultRequeueRejected (true); / / the default PrefetchCount is changed to 250 1 containerFactory. SetPrefetchCount (1); return containerFactory; }
  • Consumer modification
@RabbitListener(QueueStoDeclare = @Queue("work_queue")) // Add the containerFactory property to the listener, Then pass the configured factory to @RabbitListener(QueueStoDecare = @Queue("work_queue"), Container Factory = "WorklistenerFactory ")

5.1.3 Publish and Subscribe Model

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * String */ @Autowired @Test public void TestFantOutsendMessage () { rabbitTemplate.convertAndSend("order_exchange", "", "This is a message !" ); }}
  1. Since the switch is involved from the start of this pattern, the three-parameter method is used
  • consumers
@Component public class FanOutConsumer {// Bind temporary Queue and switch @RabbitListener(bindings = {@QueueBinding(value = @Queue()), // temporary queue exchange = @exchange (name = "order_exchange", }) public void receiveMessage1(String message) {System.out.println(" consumer 1: "+ message); } // Bind temporary Queue and switch @RabbitListener(bindings = {@QueueBinding(value = @Queue()), // temporary queue exchange = @exchange (name = "order_exchange", }) public void receiveMessage2(String message) {System.out.println(" consumer 2: "+ message); }}

5.1.4 Routing Mode (Direct)

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * String */ @Autowired @Test public void TestDirectSendMessage () { rabbitTemplate.convertAndSend("direct_exchange", "info", "This is a message !" ); }}
  • consumers
@Component public class DirectConsumer {// Bind temporary Queue and switch @RabbitListener(bindings = {@QueueBinding(value = @Queue()), // temporary queue exchange = @exchange (name = "direct_exchange", type = "direct"), // switch and type key = {"info", "warn", }) public void receiveMessage1(String message) {System.out.println(" consumer 1: "+ message); } // Bind temporary Queue and switch @RabbitListener(bindings = {@QueueBinding(value = @Queue()), // temporary queue exchange = @exchange (name = "direct_exchange", type = "direct"), // switch and type key = {"info", "warn", }) public void receiveMessage2(String message) {System.out.println(" consumer 2: "+ message); }}

5.1.5 Theme mode

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { Public void TestTopicsEndMessage () {/** ** * @Autowired @Test public void TestTopicsEndMessage () { rabbitTemplate.convertAndSend("topic_exchange", "order.insert.common", "This is a message !" ); }}
  • consumers
@Component public class TopicConsumer {// Bind temporary Queue and switch @RabbitListener(bindings = {@QueueBinding(value = @Queue()), // temporary queue exchange = @exchange (name = "topic_exchange", type = "topic"), }) public void receiveMessage1(String message) {System.out.println(" Consumer 1: ReceiveMessage1: ReceiveMessage1 ") {System.out.println(" Consumer 1: ReceiveMessage1 "); " + message); } // Bind temporary Queue and switch @RabbitListener(bindings = {@QueueBinding(value = @Queue()), // temporary queue exchange = @exchange (name = "topic_exchange", type = "topic"), }) public void receiveMessage2(String message) {System.out.println(" Consumer 2: ReceiveMessage2: ReceiveMessage2 ") {System.out.println(" Consumer 2: ReceiveMessage2 "); " + message); }}

5.2 Based on annotations + configuration classes

In fact, this way, is to switch, queue statement and binding in the configuration class, one is the consumer the annotations in the concise, then there is unified management, more organized, and producers and consumers to reference when more convenient, also change in the future, also do not need to modify every place.

Due to the long length, the most complex Topic methods are demonstrated here, and the rest are at your disposal.

  • The configuration class
@Configuration public class RabbitMqConfiguration { public static final String TOPIC_EXCHANGE = "topic_order_exchange"; public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1"; public static final String TOPIC_QUEUE_NAME_2 = "test_topic_queue_2"; public static final String TOPIC_ROUTINGKEY_1 = "test.*"; public static final String TOPIC_ROUTINGKEY_2 = "test.#"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE_NAME_1); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE_NAME_2); } @Bean public Binding bindingTopic1(){ return BindingBuilder.bind(topicQueue1()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_1); } @Bean public Binding bindingTopic2(){ return BindingBuilder.bind(topicQueue2()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_2); }}
  1. Add the @Configuration annotation: Indicates that this is a Configuration class
  2. Define constants: Switch names, queue names, route keys, etc. can be created as constants, which are easy to call, manage, and modify. You can also create a special RabbitMQ constant class.
  3. Define the exchange: Because this example is Topic, select the TopicExchange type
  4. Define the queue: just pass in the queue name constant, because there is a default value for persistence, etc., or you can customize the parameters such as persistence, exclusive or not
  5. Bind switch and queue: Use BindingBuilder’s bind method to bind the queue, to bind to the specified switch, with the route key passed in
  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { Public void TestTopicsEndMessage () {/** ** * @Autowired @Test public void TestTopicsEndMessage () { rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !" ); }}
  • consumers
@ Component public class queue @ RabbitListener TopicConsumer {/ / binding (the queues = {RabbitMqConfiguration. TOPIC_QUEUE_NAME_1}) Public void receiveMessage1(String message) {System.out.println(" consumer 1: "+ message); } / / bind queue @ RabbitListener (the queues = {RabbitMqConfiguration. TOPIC_QUEUE_NAME_2}) public void receiveMessage2 (String Message) {System.out.println(" consumer 2: "+ message); }}