RabbitMQ tutorial
When I started learning RabbitMQ, I went to GitHub for a tutorial, but I was disappointed that I didn’t find one. Spring, Mybatis and so on are a lot of tutorials, but RabbitMQ tutorials are almost impossible to find. Later, I thought I would summarize it by myself and welcome my friends to point out the inappropriate places.
This article is mainly explained against my source code on GitHub, so there is not much source code for this article. It took a long time, but in case you get lost, welcome star and Fork
Making address: https://github.com/erlieStar/rabbitmq-examples
preface
Let’s take a look at the flow of a message through RabbitMQ
The main process shown in the diagram is as follows
-
The RoutingKey is specified when the producer sends the message, and the message is then sent to the Exchange
-
Exchange routes messages to a specified queue based on a list of rules
-
Consumers consume messages from the queue
There are basically four participants in the process: Message, Exchange, Queue, and consumer, so let’s take a look at these four participants
Message
Messages can be set to a list of properties, the purpose of which can be found in RabbitMQ
The property name | use |
---|---|
contentType | The MIME type of the message body, such as Application/JSON |
contentEncoding | The encoding type of the message, such as whether to compress or not |
messageId | The unique identity of the message, set by the application |
correlationId | Generally used as the message-ID of the associated message, often used in response to the message |
timestamp | The creation time of the message, integer, accurate to the second |
expiration | The expiration time of the message, a string, but rendered as an integer, accurate to the second |
deliveryMode | The persistence type of the message, 1 non-persistent, 2 persistent, has a huge performance impact |
appId | Type and version number of the application |
userId | Identifies a logged-in user. Rarely used |
type | Message type name, which is entirely up to the application how to use this field |
replyTo | Build a private response queue for the reply message |
headers | Key/value pair table, user – defined arbitrary keys and values |
priority | Specifies the priority of messages in the queue |
Exchange
Receives a message and forwards it to the bound queue based on the routing key. Common attributes are as follows
Switch Properties | type |
---|---|
name | Switch name |
type | There are four types of switches: Direct, Topic, fanout, and headers |
durability | Specifies whether to persist. True: Persistent. Persistence saves the switch to disk without losing information when the server restarts |
autoDelete | When queues or exchanges bound to this Exchange are unbound from this Exchange, the Exchange is deleted |
internal | Set whether built-in. True: built-in. In the case of a built-in switch, the client cannot send messages to the switch and can only route to the switch through the switch |
argument | Some other structured parameters |
The most commonly used attribute is the Type attribute, which is explained in detail below
Fanout Exchange
Messages sent to the switch are routed to all queues bound to the switch and can be used for broadcasting
Instead of dealing with routing keys, you simply bind the queue to the switch
The Fanout switch is the fastest to forward messages
Direct Exchage
Route messages to queues where BindingKey and RoutingKey exactly match
Topic Exchange
As mentioned earlier, a direct exchange routing rule matches RoutingKey and BindingKey exactly. Topic is similar to Direct in that it sends messages to queues that match RoutingKey and BindingKey, but with fuzzy matching.
-
RoutinKey is a “. A string separated by numbers (e.g. Com.rabbitmq. client)
-
BindingKey and RoutingKey are also “. A character string separated by a sign
-
There are two special strings “*” and “#” in BindKey for fuzzy matching, where “*” is used to match no more than one word, and “#” is used to match multiple words (including 0 and 1).
BindIngKey | RoutingKey that can be matched |
---|---|
java.# | Java.lang, java.util, java.util. Concurrent |
java.* | Java. Lang, Java. Util |
*.*.uti | Com. Javashitang. Util, org. Spring. Util |
If there are now two routingkeys java.lang and java.util.Concurrent messages, java.lang is routed to Consumer1 and Consumer2, Java.util.concurrent is routed to Consumer2.
Headers Exchange
A HEADERS exchange does not rely on the matching rules of the routing key to route a message, but matches it based on the HEADERS attribute in the content of the sent message. Headers switches have poor performance, are not practical, and are rarely used.
Queue
The common attributes of queues are as follows
Parameter names | use |
---|---|
queue | Queue name |
durable | Whether to persist: true: Persistent. Persistent queues are saved to ensure that information is not lost when the server restarts |
exclusive | Sets whether to exclude, true to exclude. If a queue is declared as an exclusive queue, it is visible only to the connection that first declared it and is automatically deleted when the connection is disconnected (i.e., a queue can have only one consumer) |
autoDelete | Set whether to automatically delete the queue. True: Automatically delete the queue when at least one consumer is connected to the queue and all connected consumers are disconnected |
arguments | Set other queue parameters, such as x-message-TTL and x-max-length |
Common queue parameters that can be set in Arguments are as follows
Parameter names | purpose |
---|---|
x-dead-letter-exchange | Dead letter switch |
x-dead-letter-routing-key | An optional routing key for dead-letter messages |
x-expires | The queue is deleted after the specified number of milliseconds |
x-ha-policy | Creating an HA Queue |
x-ha-nodes | Distribution node of the HA queue |
x-max-length | Maximum number of messages in a queue |
x-message-ttl | Message expiration time in milliseconds, queue level |
x-max-priority | Queue prioritization with a maximum priority of 255 |
Rabbitmq-api (Use of rabbitMQ API)
Chapter_1: Quick start, write a producer and consumer of RabbitMQ
Chapter_2: Demonstrates the use of various exchanges
Review the various Exchange machine routing rules described above
Switch type | Routing rules |
---|---|
fanout | Messages sent to the switch are routed to all queues bound to the switch and can be used for broadcasting |
direct | Route messages to queues where BindingKey and RoutingKey exactly match |
topic | Topic is similar to Direct in that it sends messages to queues that match RoutingKey and BindingKey, but with fuzzy matching |
headers | Poor performance, basic will not use |
Chapter_3: Pull message
Messages can be obtained in two ways
-
Get message
-
Consume messages
So should we pull or push a message? Get is a polling model, and consumer is a push model. The GET model causes the cost of synchronous communication with RabbitMQ for each message, which consists of the client application sending the request frame and the RabbitMQ sending the reply. So push messages, avoid pulling
Chapter_4: manual ack
There are two ways to confirm a message
-
Automatic acknowledgement (autoAck=true)
-
Manual validation (autoAck=false)
A consumer can specify an autoAck parameter when consuming a message
String basicConsume(String queue, boolean autoAck, Consumer callback)
AutoAck =false: RabbitMQ will wait for the consumer to display a reply acknowledgement before removing the message from memory (or disk)
AutoAck =true: RabbitMQ will automatically set the sent messages to acknowledgement and delete them from memory (or disk), regardless of whether the consumer actually consumes them
The method of manual confirmation is as follows. There are two parameters
basicAck(long deliveryTag, boolean multiple)
DeliveryTag: Identifies the message delivered in the channel. RabbitMQ pushes a message to a Consumer with a deliveryTag so that the Consumer can tell RabbitMQ which message was confirmed when the message is confirmed. RabbitMQ guarantees that each message’s deliveryTag increases by 1 on each channel
Multiple =true: all messages with id<=deliveryTag will be confirmed
Myltiple =false: Messages with id=deliveryTag will be acknowledged
What happens when the message is never confirmed?
If a message in the queue is sent to a consumer and the consumer does not acknowledge the message, it remains in the queue until it is confirmed and deleted. If the message sent to consumer A remains unacknowledged, RabbitMQ will not consider redelivering consumer A’s unacknowledged message to another consumer until consumer A’s connection to RabbitMQ is disconnected
Chapter_5: Two ways to reject a message
There is only one way to confirm a message
-
basicAck(long deliveryTag, boolean multiple)
There are two ways to reject a message
-
basicNack(long deliveryTag, boolean multiple, boolean requeue)
-
basicReject(long deliveryTag, boolean requeue)
There is only one difference between basicNack and basicReject. BasicNack supports batch rejection
The deliveryTag and multiple parameters were mentioned earlier.
Requeue =true: The message is sent to the queue again
Requeue =false: The message will be lost directly
Chapter_6: failure notification
Chapter_6 through chapter_10 briefly describe trade-offs when publishing messages
Failure notifications and publisher confirmations are the ones we use most often
How do we get messages that are not properly routed when they cannot be routed to a queue?
-
Set Mandatory to true when sending messages
-
Producers can be invoked by the channel. AddReturnListener to add ReturnListener listener get not be routed to the message queue
Mandatory is an argument in channel.basicPublish()
Mandatory =true: If the switch cannot find a queue that matches the criteria based on the routing key, RabbitMQ will call basic. Return to Return the message to the producer
Mandatory =false: The message is discarded
Chapter_7: Confirm by the publisher
When a message is sent, does it reach the Exchange at all? By default, the producer does not know if the message has reached the Exchange
RabbitMQ provides two solutions to this problem
-
Transactions (covered later)
-
Publisher Confirm
Publishers confirm that there are three ways to program
-
Normal confirm mode: after sending a message, call the waitForConfirms() method and waitForConfirms on the server. It’s actually a serial confirm.
-
Batch confirm mode: after sending a batch of messages, call the waitForConfirms() method and waitForConfirms on the server.
-
Asynchronous confirm mode: A callback method is provided. After the server confirms one or more messages, the Client calls back this method.
Asynchronous Confirm mode has the highest performance and is often used, so I want to share this in detail
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("handleAck, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("handleNack, deliveryTag: {}, multiple: {}", deliveryTag, multiple); }});Copy the code
If you have written asynchronous confirm code, you should be familiar with this code. There is also deliveryTag and multiple in this code. But I should say that deliveryTag here has nothing to do with multiple and ack of messages.
Ack in confirmListener: controlled by RabbitMQ to confirm whether a message has arrived at the Exchange
Ack of messages: You can confirm either automatically or manually that a message in the queue has been consumed by a consumer
Chapter_8: indicates the standby switch
If mandatory is not set when a producer sends a message, the message will be lost without being routed to the queue. If mandatory is set, the programming logic of the ReturnListener needs to be added, and the producer code becomes complicated. If you don’t want to complicate the producer’s programming logic, but don’t want messages to get lost, you can use an alternate exchange that stores messages that are not routed to the queue in RabbitMQ and processes them as needed
Chapter_9: transaction
There are three methods associated with transactions in RabbitMQ
methods | explain |
---|---|
channel.txSelect() | Sets the current channel to transaction mode |
channel.txCommit() | Commit the transaction |
channel.txRollback() | Roll back the transaction |
The transaction must be successfully committed if the message is successfully sent to RabbitMQ’s exchange. Otherwise, the transaction can be rolled back after the exception is caught and the message can be resold at the same time. As transactions can drain RabbitMQ’s performance, a publisher confirmation is usually used instead of a transaction
Chapter_10: Message persistence
To persist messages, simply set the delivery-mode of the message attribute to 2
The RabbitMQ encapsulates the attributes to us, namely MessageProperties. PERSISTENT_TEXT_PLAIN, use can reference lot code in detail
When we want to persist messages, it is best to set the queue and message persistence at the same time, because if we only set the queue persistence, the message will be lost after the restart. Only set the persistence of the queue. After the restart, the queue disappears and the message is lost
Chapter_11: Dead letter queue
DLX, which stands for dead-letter-exchange, is called a Dead Letter Exchange. When a message becomes dead message in a queue, it can be re-sent to another exchange, the DLX. The queue bound to the DLX is called a dead letter queue. The DLX is also a normal exchange, no different from a normal exchange, which actually sets the attributes of a queue
There are several reasons why messages become dead-letter
-
Message rejected (basic.reject/basic.nack) and not redelivered (requeue=false)
-
Message expiration
-
The queue length reaches the maximum. Procedure
The difference between dead letter and standby switches
Standby switch: 1. Messages are forwarded to the standby switch when they cannot be routed. 2. The standby switch is defined when the primary switch is declared
Dead letter exchange: 1. Messages that have reached the queue but have been rejected by consumers are forwarded to the dead letter exchange. 2. Dead letter switches are defined when queues are declared
Chapter_12: Traffic Control (Service Quality Assurance)
Qos is traffic limiting on the server. Qos does not apply to pull-mode consumption
You only need to perform the following two steps to use qos
-
Set autoAck to false (autoAck=true does not work)
-
The basicQos method is called before the basicConsume method, which takes three parameters
basicQos(int prefetchSize, int prefetchCount, boolean global)
Parameter names | meaning |
---|---|
prefetchSize | The total size of messages to be fetched in batches. 0 indicates that there is no limit |
prefetchCount | After the prefetchCount message is consumed (the prefetchCount message is ack), it is pushed again |
global | A global value of true limits a channel, otherwise limits each consumer, because a channel allows multiple consumers |
Why use qos?
-
Improve service stability. Assume that the consumer terminal is unavailable for a period of time, resulting in tens of thousands of unprocessed messages in the queue. If the client is enabled, a large number of messages are pushed, which may cause the consumer terminal to become stuck or may be directly unavailable. Therefore, it is important to limit the flow of the server
-
Improved throughput. When a queue has multiple consumers, messages received by the queue are sent to consumers in a polling manner. However, due to machine performance and other reasons, the consumption power of each consumer is different, which will cause some consumers to run out of consuming messages, while others still pile up some messages, resulting in the overall application throughput decline
Springboot rabbitMQ (springboot with RabbitMQ)
To be continued
To contact me
email: [email protected]
Welcome everyone to communicate with me, pay attention to the public number Java consciousness hall to get my contact information