Where the message goes

MANDATORY and IMMEDIATE are two parameters in the send message method that have the capability to return the message to the producer if the destination is not reachable during message delivery. RabbitMQ provides a backup exchange that can store messages that have not been routed by the exchange without returning them to the client.

Mandatory parameters

When the MANDATORY parameter is set to true, and the exchange cannot find a queue that matches its type and routing key, RabbitMQ returns the message to the producer. When the MANDATORY parameter is set to false, the message is discarded directly.

So how does a producer get messages that are not properly routed to the appropriate queue? This can be done by registering a message return listener:

func (ch *Channel) NotifyReturn(c chan Return) chan Return

The immediate parameter

When the immediate parameter is set to true, if the exchange finds that there are no consumers on the queue when it routes the message to the queue, the message will not be queued. When none of the queues matching the routing key has a consumer, the message is returned to the producer.

In summary, the MANDATORY parameter tells the server to route the message to at least one queue or return it to the producer. The immediate parameter tells the server to consume consumers immediately if there are any on the queue associated with the message; If there are no consumers on any of the matching queues, the message is returned directly to production instead of being queued and consumer.

Backup switch

If the producer does not set the MANDATORY parameter when sending the message, the message will be lost if it is not routed. If the MANDATORY parameter is set, then the programming logic for the ReturnListener needs to be added, complicating the code for the producer. If you don’t want to complicate the programming logic of the producer without losing messages, you can use a backup exchange, which stores unrouted messages in RabbitMQ and processes them when needed.

This can be done by setting the alternate-exchange option in the args parameter when the exchange is declared, or it can be done as a policy. If both are used together, then the former takes precedence and overrides the Policy Settings.

The backup exchange is not much different from a normal exchange, and it is important to note that the routing key for the message to be re-sent to the backup exchange is the same as the routing key for the message to be sent from the producer.

For backup switches, the following special cases are summarized:

  • If the backup switch set does not exist, no exception will occur on either the client or the RabbitMQ server, and the message will be lost.
  • If the backup exchange is not bound to any queues, neither the client nor the RabbitMQ server will receive an exception, and the message will be lost.
  • If the backup exchange does not have any matching queues, neither the client nor the RabbitMQ server will receive an exception, and the message will be lost.
  • If the backup exchange is used with the MANDATORY parameter, the MANDATORY parameter is invalid.

Expiration time (TTL)

Set the TTL of the message

There are currently two ways to set the TTL of a message. The first is through the queue property, where all messages have the same expiration time. The second method is to set the message itself separately, and the TTL of each message can be different. If the two methods are used together, the TTL of the message is the smaller value in between. Messages that live in the queue longer than the set TTL value become “dead letter”.

The way to set message TTL via queue properties is to add the x-message-TTL option to the queueDeclare method’s args parameter, which is measured in milliseconds. It can also be set through the Policy or HTTP API interface.

If TTL is not set, this message will not expire. If the TTL is set to 0, it means that the message is immediately discarded unless it can be delivered directly to the consumer at this point.

The way to set the TTL for each message is to set the Expiration property in the Publish method’s message Publishing structure, in milliseconds. It can also be set through the HTTP API interface.

In the first way of setting the TTL property of the queue, a message is deleted from the queue once it is expired, while in the second way, even if the message is expired, it is not deleted from the queue immediately because each message is determined before it is delivered to the consumer.

Set the TTL of the queue

The x-expires option in the args parameter of the queueDeclare method controls the maximum amount of time that the queue is unused before being automatically deleted, in milliseconds, which cannot be set to zero. Unused means that there are no consumers on the queue, that the queue has not been re-declared, and that no GET method has been called during the expiration period.

Setting the TTL in the queue can be applied to RPC-style response queues, where many unused queues are created.

RabbitMQ ensures that the queue is deleted when the expiration time is reached, but does not guarantee how timely the deletion can be. After a RabbitMQ restart, the expiration time of the persistent queue is recalculated.

Dead-letter queue

DLX, also known as a dead-letter-exchange, can be called a dead-letter Exchange. When a message becomes dead-letter in one queue, it can be re-sent to another exchange, the DLX, and a queue bound to the DLX is called a dead-letter queue.

Messages tend to become dead letters because of the following:

  • The message is rejected /Nack and the requeue parameter is set to false.
  • The message expired.
  • The queue reached its maximum length.

The DLX is also a normal exchange, no different from a normal exchange, and can be specified on any queue (in effect, setting queue properties). When there is a dead letter in the queue, RabbitMQ will automatically republish the message to the set DLX, which will then be routed to the dead letter queue. Messages in this queue can be listened on for processing, which can be used in conjunction with setting the TTL of the message to 0 to compensate for the IMMEAIATE parameter.

Add the DLX to the queue by setting the x-dead-letter-exchange option in the args parameter of the queueDeclare method:

err = ch.ExchangeDeclare( "dlx_exchange", "direct", true, false, false, false, nil, ) if err ! = nil { log.Fatalf("%s: %s", err, "Failed to declare an exchange") return } args := make(map[string]interface{}) args["x-dead-letter-exchange"] = "My_queue ", err := ch.queueClare ("my_queue", true, false, false, args,) if err! = nil { log.Fatalf("%s: %s", err, "Failed to declare a queue") return }

You can also specify a route key for this DLX, or if not specified, use the route key of the original queue:

args["x-dead-letter-routing-key"] = "dlx_routing_key"

The DLX is a very useful feature for RabbitMQ. It can handle the abnormal situation, the message can not be properly consumed by the consumer (the consumer called Nack or Reject) and put into the dead letter queue, the subsequent analysis program can consume the contents of the dead letter queue to analyze the abnormal situation, so as to improve and optimize the system.

Delays in the queue

The object stored in the delay queue is the corresponding delayed message. The so-called “delayed message” means that when the message is sent, consumers do not want to get the message immediately, but wait for a certain time before consumers can get the message for consumption.

There are many usage scenarios for delay queues, such as:

  • In an order system, a user usually has 30 minutes to make a payment after placing an order. If the payment is not made within 30 minutes, the order will be handled with an exception, and the delay queue can be used to process the order.
  • Users want to use their mobile phones to remotely control smart devices in their homes to work at designated times. At this time, user instructions can be sent to the delay queue, and when the time set by the instructions is up, the instructions can be pushed to the smart device.

RabbitMQ has no direct support for delay queues itself, but it can be simulated using the DLX and TTL described earlier.

Suppose you have an application where you need to set a 10-second delay for each message. You can create two sets of switches and queues: Regular exchange exchange.normal, regular queue queue.normal and dead-letter exchange exchange. DLX, dead-letter queue queue. DLX, then add dead-letter exchange exchange. DLX to queue.normal. The producer sends the message to Exchange.normal and sets the TTL to 10 seconds, while the consumer subscribed to Queue.dlx instead of Queue.normal. When the message expires in queue.normal and is stored in queue.dlx, the consumer happens to consume the message with a 10-second delay.

Priority queue

Priority queues. As the name implies, queues with high priority have high priority, and messages with high priority have the privilege of being consumed first.

Priority queues can be implemented by setting the x-max-priority option in the args parameter of the QueueDeclare method. However, this is simply configuring the maximum Priority of a queue, after which you need to set the Priority of the message at send time by the Priority attribute in the Publishing structure.

The default priority of the message is at least 0 and up to the maximum priority set by the queue. High-priority messages can be consumed first, but if the consumer consumes faster than the producer and there is no message build-up in the Broker, setting the priority on the sent message is of little use.

RPC implementation

RPC is short for Remote Procedure Call. It is a technique for requesting services from a remote computer over a network without the need to know the underlying network. The main purpose of RPC is to make it easier to build distributed computing without losing the semantic simplicity of local calls while providing powerful remote calling capabilities.

Implementing RPC in RabbitMQ is also simple. The client sends the request message, and in order to receive the response message, we need to send a callback queue in the request message. You can use the default queue:

err = ch.Publish(
    "",
    "rpc_queue",
    false,
    false,
    amqp.Publishing{
        ContentType:   "text/plain",
        CorrelationId: "",        
        ReplyTo:       "",
        Body:          []byte("rpc"),
    },
)

For the Publishing structure involved in the code, two properties are needed:

  • REPLYTO: Usually used to set up a callback queue.
  • Correlationid: Used to associate the request with its response after calling the RPC.

It would be very inefficient to create a callback queue for each RPC request, as in the code above. But luckily there is a general solution — you can create a single callback queue for each client.

We should set a unique correlationId for each request, for the callback queue, after it receives a reply message, it can match to the corresponding request based on this property. If the callback queue receives a reply message with an unknown correlationId, it can simply be discarded.

As shown in the figure, the processing flow of RPC is as follows:

  1. When the client starts, an anonymous callback queue is created.
  2. The client sets ReplyTo and Correlationid for the RPC request.
  3. The request is sent to the rpc_queue queue.
  4. The RPC server listens for requests in the rpc_queue, and when the request arrives, the server processes it and sends a message with the result to the client. The queue that is received is the callback queue that ReplyTo sets up.
  5. The client listens to the callback queue, when there is a message, checks the correlationId property, and if it matches the request, performs other callback processing.

In RabbitMQ’s example, the RPC client calls the server’s method to get the Fibonacci value: rpc_server.go rpc_client.go

Producer confirmation

With RabbitMQ, when the producer of the message sends the message, does the message arrive at the server correctly? RabbitMQ offers two solutions to this problem:

  • Transaction mechanism
  • Publisher Confirm mechanism

Transaction mechanism

There are three methods in the RabbitMQ client that are related to the transaction mechanism:

func (ch *Channel) Tx() error
func (ch *Channel) TxCommit() error
func (ch *Channel) TxRollback() error

Channel.Tx is used to set the current channel to transactional mode, Channel.TxCommit is used to commit the transaction, and Channel.TxRollback is used to roll back the transaction.

If the transaction is committed successfully, then the message must reach RabbitMQ. If an exception is thrown due to a RabbitMQ crash or other reason before the transaction is committed, then we can catch it. If the RabbitMQ exception is thrown before the transaction is committed, then we can catch it. The transaction is then rolled back by implementing the Channel.txRollback method.

If you want to send multiple messages, wrap methods such as Channel.publish and Channel.txCommit into the loop.

A transaction does solve the problem of message acknowledgement between the message sender and RabbitMQ. If the message is received successfully by RabbitMQ, the transaction can be committed successfully. Otherwise, the transaction can be rolled back after the exception is caught and the message can be retransmitted simultaneously. But using the transaction mechanism can seriously affect the performance of RabbitMQ, so is there a better way? RabbitMQ provides an improved solution, which is the sender acknowledgement mechanism.

Sender acknowledgement mechanism

A producer can set the channel to confirm mode. Once the channel is in confirm mode, all messages posted on the channel are assigned a unique ID (starting at 1). When the message is posted to all matching queues, RabbitMQ sends an ACK (containing the message ID) to the producer. This lets the producer know that the message has arrived at its destination correctly. If the message and queue are persistent, the acknowledgement message is sent after the message has been written to disk. The deliveryTag in the acknowledgement message that RabbitMQ sends back to the producer contains the sequence number of the acknowledgement message. In addition, RabbitMQ can set multiple in the ACK method to indicate that all messages up to this sequence number have been processed.

The transaction mechanism blocks the sender after a message has been sent, waiting for a response from RabbitMQ before proceeding to the next message. The sender to confirm the mechanism, by contrast, the biggest advantage is that it is asynchronous, once issued a message producer application can return waiting for channel confirmed at the same time continue to send a message, when a message after the final confirmation, producers application can through the callback method to process the confirmation message, If RabbitMQ loses messages due to its own internal error, it sends a Nack command, which can also be handled by the producer application in the callback method.

In confirm mode, all messages sent are either Ack or Nack once. There is no chance that a message will be both Ack and Nack, but RabbitMQ does not guarantee how quickly a message will be confirmed.

Here is a code example from the official website:

package main import ( "log" amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err ! = nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.close () // initialize the channels receiving Confirmation := make(chan amqp.confirmation) // register the channels to listen for posted Confirmation messages Ch. NotifyPublisher (Confirms) go func() {for Confirms := Range Confirms {// Confirms {// Code when messages are made  is confirmed log.Printf("Confirmed") } else { // code when messages is nack-ed log.Printf("Nacked") } } }() // Confirm(false) failonError (err, "Failed to confirm") q, err := ch.queueClare ("", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") consume(ch, q.Name) publish(ch, q.Name, "hello") log.Printf(" [*] Waiting for messages. To exit press CTRL+C") forever := make(chan bool) <-forever } func consume(ch *amqp.Channel, qName string) { msgs, err := ch.Consume( qName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() } func publish(ch *amqp.Channel, qName, text string) { err := ch.Publish("", qName, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(text), }) failOnError(err, "Failed to publish a message") }

Note: Transaction and Publisher Confirm mechanisms are mutually exclusive and cannot coexist.

Key points on the consumer side

Message delivery

When a RabbitMQ queue has multiple consumers, messages received by the queue are distributed to consumers in a round-robin manner. Each message will only be sent to one consumer in the subscription list. If the load is heavier now, you just need to create more consumers to consume the processing messages.

Many times, however, the distribution mechanism for polling is less elegant. If some consumers are too busy to consume as many messages, and some other consumers, for reasons such as simple business logic, superior machine performance, etc., quickly finish processing the assigned messages and the process is idle, then overall application throughput will decline.

This is where QoS is used, which limits the maximum number of unacknowledged messages that a consumer on the channel can hold:

func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
  • PREFETCHCOUNT: The upper limit on the number of unacknowledged messages from the consumer. A setting of 0 indicates no upper limit.
  • PREFETCHSIZE: The upper limit on the size of an unacknowledged message by the consumer, in B. A setting of 0 indicates no upper limit.
  • Global: True means yes. Global validation means that all consumers on the channel are restricted by prefetchCount and prefetchSize (otherwise only new consumers will be restricted).

Note: QoS method does not work for pull mode.

Message sequencing

RabbitMQ can guarantee message ordering without using any advanced features, without exceptions such as message loss, network failures, and with only one consumer, preferably only one producer. If more than one producer sends messages at the same time, there is no way to determine the order in which the messages arrive at the Broker, and therefore no way to verify the sequencing of the messages.

In many cases, this can result in the RabbitMQ message being misordered. To ensure the order of messages, the business side needs to do further processing after using RabbitMQ, such as adding a global ordered identifier to the message body to achieve this.