Introduction to RabbitMQ
RabbitMQ queues are based on the AMQP protocol and are developed using The Erlang language, supporting multiple client types such as Java, Ruby, Go, PHP, etc. Other popular message queue middleware are RocketMQ, ActiveMQ, Kafka, and so on. You need to test the RabbitMQ service. If the RabbitMQ service is not installed, install the RabbitMQ service
MQ’s three biggest features are asynchronous, peak clipping, and decoupling, as shown in the figure below. The rest of the complex concepts are not scripted and copied, but written here is nonsense anyway. The simplest generalization is the container for storing data, which is similar to MySQL database and Redis database. The difference lies in the application scenario determined by its own implementation characteristics
Two: AMQP protocol model
It can be simply understood as a set of standard messaging protocols, such as HTTP and HTTPS, which have their own rules. The following figure shows the basic model of the AMQP protocol
The serial number | component | describe |
---|---|---|
1 | Publisher | Producer, used by the application client to send messages to the server |
2 | Connection | Connection, both producer and consumer clients need a Connection to the messaging application server. Connection creation and destruction costs are too high, and lightweight logical Connection channels are derived |
3 | Channel | Lightweight logical connections, also known as channels. Channels are completely isolated and thread-safe |
4 | Broker | The message application service principal, which the client does not write code about, is a logical concept |
5 | Virtual Host | It is equivalent to a namespace. When there are multiple users, each user can operate in the Virtual Host area assigned by himself |
6 | Exchange | Message exchange, through which the producer is not directly coupled to the queue, forwards the message |
7 | Binding | The binding relationship, the binding relationship between the message exchange and the queue, is compared to the RoutingKey carried by the producer to see which binding queue the message is routed to |
8 | Queue | Queue, the last location where messages are stored |
9 | Consumer | Consumers, who consume directly through coupling to queues, are somewhat different from producers |
Iii. AMQP interactive process
The process of HTTP connection creation and destruction is divided into three handshakes and four waves. What is the process design of the interaction between the producer and consumer client and the message application server in AMQP protocol? The producer, consumer, and server connection creation and destruction commands remain the same, but the commands involved in the intermediate logical flow are certainly different. So the entire production-consumption process is divided into three parts, connection creation and destruction, production messages, and consumption messages
3.1 Connection Creation and destruction
The creation and destruction of a connection consists of four modules of communication, starting with connection creation on, then channel creation on, and finally channel closing and connection closing. It can be understood from the flow that Channel is a logical Connection based on Connection and must depend on Connection. The rest of the steps are commands — confirming the format of the command to ensure that the client and server communicate properly
3.2 Message Production
The message production flow is shown below, but the point is that the server does not respond to this operation. Therefore, this is a link that causes message loss. When the network connection between the server and the client fluctuates, the connection is unavailable and closed. At this time, the transmitted message may be lost
3.3 Message Consumption
Message consumption focuses on the confirmation mechanism, that is, the message will not be deleted immediately after being pushed to the client by the server. Instead, the message status flag will be changed and the corresponding logic will be executed after receiving the confirmation result from the client. Of course, there may be a variety of results of feedback validation, and there are also a variety of operations, which will be explained in a follow-up article
Four: Basic operations on the RabbitMQ client
As mentioned earlier RabbitMQ supports multi-language clients, the following examples will be in Java because the author himself is working on Jave development. Other computer language clients can refer to the RabbitMQ website, and the following dependencies can be introduced in the RabbitMQ Demo if you test the RabbitMQ functionality directly in a J2EE project
<! --> <dependency> <groupId>com. RabbitMQ </groupId> <artifactId>amqp-client</artifactId> The < version > 5.4.3 < / version > < / dependency >Copy the code
4.1 Creating a Connection Channel
@SneakyThrows
public static Channel createChannel(a){
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// Set the RabbitMQ service application information
The default service port is 5672. After the installation and startup, there will be a default account guest and password guest
factory.setHost(RabbitMQ Application Service installation IP address);
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// instantiate the connection
Connection connection = factory.newConnection();
// Get channel instance
Channel channel = connection.createChannel();
return channel;
}
Copy the code
4.2 Message Push
Note that this is just the most basic push message to the exchange and then route the message according to the routingKey and Binding. As for some switches or queues involved in the code, the characteristic attributes of the message will be explained later. Of course, more than the fanout that appears in the code, there are multiple switches that provide different message routing features
@SneakyThrows
public static void publishMessage(Channel channel,byte[] message){
// Instantiate the message service component
String exchangeName = "exchangeName";
String queueName = "queueName";
String binding = "binding";
String routingKey = binding;
// The switch is persistent and automatically deleted
boolean durable = true , autoDelete = false;
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,durable,autoDelete,null);
// exclusive queue
boolean exclusive = false;
channel.queueDeclare(queueName,durable,exclusive,autoDelete,null);
// Switch, queue binding
channel.queueBind(queueName,exchangeName,binding);
// Send a production message
channel.basicPublish(exchangeName,routingKey,null,message);
}
Copy the code
4.3 Consumption Message
The consumption message is actually a confirmation mechanism, which has been mentioned above and will be explained further. Note that message consumption uses the DefaultConsumer class, which implements the interface Consumer, which provides a set of methods for processing messages, RabbitMQ either implements the Consumer interface or inherits DefaultConsumer class override logic to handle messages. The method handleDelivery() rewritten in the following code can be understood by looking back at the RabbitMQ message consumption interaction command
@SneakyThrows
public static void consumeMessage(Channel channel){
String queueName = "queueName";
String consumerTag = "consumerTag";
// Message consumption instance
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
// Process the message}};// Automatic message acknowledgement
boolean autoAck = true;
channel.basicConsume(queueName,autoAck,consumerTag,consumer);
}
Copy the code
4.4 Destroying the Connection Channel
This is not to say that it is wrong to close Connection directly, but it is not allowed to close Connection if the Connection has other channels, otherwise other code using channels will throw an exception. That is, Connection closing is equal to closing all channels, which are logical connections that depend on Connection
@SneakyThrows
public static void destroyConnection(Connection connection,Channel channel){
channel.close();
connection.close();
}
Copy the code