1. Install RabbitMQ

  • Window Single-node installation guide
  • Linux Single-server Installation Guide

2. RabbitMQ introduction

website

Rabbit Technologies developed RabbitMQ 1.0 based on AMQP in 2007. Why use Erlang? Erlang is a development language Matthias is good at. The second is that Erlang is a language written for telephone switches, which are naturally distributed and highly concurrent.

Why the name Rabbit Technologies? Because rabbits run fast and breed like crazy.

Initially used in the financial industry, RabbitMQ is now used in companies all over the world. Most of the big companies in China use RabbitMQ, including Toutiao, Meituan, Didi (TMD), Qunar, elong and Taobao.

RabbitMQ and the Spring family belong to the same company :Pivotal.

Of course RabbitMQ supports many protocols besides AMQP, STOMP, MQTT, HTTP, WebSockets.

2.1. Working model and components

Let’s start with the picture below

2.1.1, the Broker

The RabbitMQ server (default port 5672) is a RabbitMQ server.

2.1.2, Connection

Either a producer sending a message or a consumer receiving a message must establish a connection with the Broker, which is a TCP long connection.

2.1.3, Channel

If all producers sent messages and consumers received messages, TCP long connections were created and released directly, it would be a huge performance drain and time drain on the Broker.

Therefore, the concept of Channel is introduced into AMQP, which is a virtual connection. We translate it as a channel, or message channel. In this way, we can create and release channels within the remaining TCP connections, greatly reducing resource consumption.

Different channels are isolated from each other, and each Channel has its own number. For each client thread, there is no need to share the Channel, each using its own Channel.

Another important thing to note is that channels are the most important programming interface in RabbitMQ’s native API, meaning that we define switches, queues, bindings, send messages, consume messages, and call methods on the Channel interface.

2.1.4, Queue

A Queue is the object that RabbitMQ uses to store messages. RabbitMQ actually stores messages in a database, which like RabbitMQ is developed in Erlang and is called Mnesia. Mnesia’s storage path can be found on disk.

C:\Users\ username \AppData\RoamingRabbitMQ\db\ rabbit@username -mnesia

Queues are also the link between producers and consumers. Messages sent by producers arrive in queues and are stored in queues. Consumers consume messages from the queue.

2.1.5, Consumer

It means consumer. RabbitMQ provides two modes of consumption

  • Pull mode, using method BasicGet
  • Push mode, corresponding to method BasicConsume

Pull mode, the corresponding method is basicGet. Messages are stored on the server and can only be retrieved by consumers. If a message is fetched every once in a while, the real-time nature of the message degrades. But the advantage is that you can decide how often to get messages based on your spending power.

In Push mode, the corresponding method is basicConsume. As long as the producer sends the message to the server, it will immediately Push it to the consumer. The message is stored in the client with high real-time performance. Spring AMQP is push and listens to the queue through an event mechanism. Whenever a message arrives on the queue, it triggers a method to consume the message.

2.1.6, Exchange

Exchange is an Exchange. It is used to distribute messages to the Queue according to rules. Therefore, Exchange and the queues that need to receive messages must establish a binding relationship and assign a special identity to each queue.

Exchange and queues have a many-to-many binding, meaning that messages from one switch are routed to multiple queues, and a queue can receive messages from multiple switches.

Once the binding is established, the producer sends a message to the Exchange, which also carries a special identity. When the tag matches the bound tag, messages are sent to one or more queues that match the rule.

2.1.7, Vhost

A Vhost can be understood as a virtual host and its relationship to rabbitMQ is similar to that of a virtual machine and a physical host. Multiple Vhosts can be created for the same RabbitMQ server, which are independent of each other and have their own switches, queues, bindings, and permissions. We install RabbitMQ with a default VHOST named /.

Introduction to creation

2.2 Routing mode

There are four types of switches in RabbitMQ: Direct, Topic, Fanout, and Headers. Headers is not commonly used. The type of switch can be specified at creation time, either on a web page or in code.

2.2.1. Direct Connection

A message can be routed to a queue only when its routing key and binding key match exactly.

Eg: channel.basicPublish(“MY_DIRECT_EXCHANGE “,” spring “,” MSg1 “) Only the first queue can receive messages.

2.2.2 Topic Topic

The message’s routing key matches the binding key and can be routed to multiple queues.

The routing rules to be described are as follows

  • *No more, no lessA word
  • #On behalf ofZero or more words

Eg. Enter the queue that can be routed to according to the following information.

routing key queue
junior.abc.jvm JUNIOR_QUEUE
senior.netty NETTY_QUEUE, SENIOR_QUEUE

2.2.3 Fanout broadcast

When a broadcast switch is bound to a queue, you do not need to specify a binding key. So producers send messages to broadcast switches without carrying routing keys. When a message reaches the switch, all queues bound to it receive copies of the same message.

BasicPublish (” MY_FANOUT_EXCHANGE”,””, “MSG 4”) all three queues receive MSG 4.

2.3. Delayed message implementation

2.3.1, scene

Suppose you have a business scenario where orders that have not been paid for more than 30 minutes are automatically closed. How should this function be implemented?

There are two schemes as follows:

  • Use RabbitMQ’s Dead Letter Queue.
  • Use rabbitmq-delayed-message-exchange

2.3.2 Use RabbitMQ dead-letter queue

After expiration, the message is delivered to DLX (dead letter switch), routed to DLQ (dead letter queue), monitored DLQ, and realized delay queue.

2.3.2.1 Message flow process:

Producer — > original switch — > Original queue (after TTL is exceeded) — > dead letter switch — > Dead letter queue — > final consumer

2.3.2.2 Message expiration Settings

There are two Settings to implement message expiration

1. Set the queue propertiesx-message-ttl

When this property is set, all messages in the queue expire when they are not consumed. It doesn’t matter whose package it is.

@Bean("ttlQueue")
public Queue queue(a) {
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("x-message-ttl".11000); // Messages in the queue expire 11 seconds after they are not consumed
    return new Queue("TTL_QUEUE".true.false.false, map);
}
Copy the code
2. Set message properties

Through MessageProperties. SetExpiration (” 4000 “) method set message expiration time.

MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("4000"); // The expiration attribute of the message, in ms
Message message = new Message("This message expires in 4 seconds.".getBytes(), messageProperties);
rabbitTemplate.send("TTL_EXCHANGE"."test.ttl", message);
Copy the code

Note: If the expiration time is set for both, the expiration time takes effect first.

2.3.2.3 Where do dead letters go?

When creating a queue, you can specify a Dead Letter Exchange (DLX). The Queue bound to a Dead Letter switch is called a Dead Letter Queue (DLQ). The DLX is a common switch, and the DLQ is also a common Queue (for example, substitute players are also common players).

That is, if a message is expired and the queue specifies a DLX, it will be sent to the DLX. If DLX is bound to DLQ, it will route to DLQ. After routing to DLQ, we can consume.

2.3.2.4 How to use the dead-letter queue?

Let’s use an example to demonstrate the use of a dead letter queue.

  1. Declare the original switch (ORI_USE_EXCHANGE) and the original queue (ORI_uSE_QUEUE) bound to each other. Specifies the dead letter exchange (DEAD_LETTER_EXCHANGE) of the original queue.
  2. Declare DEAD_LETTER_EXCHANGE, DEAD_LETTER_QUEUE, and bind with “#” to represent unconditional routing.
  3. The end consumer listens to the dead-letter queue, where the logic of checking the order status is implemented.
  4. The producer sends a message test, setting the message to expire in 10 seconds.

Code link

 // Specifies the queued dead letter switch
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange"."GP_DEAD_LETTER_EXCHANGE");
// arguments.put("x-expires",9000L); // Set the TTL of the queue
// arguments.put("x-max-length", 4); // If the maximum queue length is set, the first queued messages will be sent to the DLX

// Declare queues (default switch AMQP default, Direct)
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare("GP_ORI_USE_QUEUE".false.false.false, arguments);

// Declare a dead letter switch
channel.exchangeDeclare("GP_DEAD_LETTER_EXCHANGE"."topic".false.false.false.null);
// Declare a dead-letter queue
channel.queueDeclare("GP_DEAD_LETTER_QUEUE".false.false.false.null);
// Bind the Dead letter routing key to #
channel.queueBind("GP_DEAD_LETTER_QUEUE"."GP_DEAD_LETTER_EXCHANGE"."#");
System.out.println(" Waiting for message....");
Copy the code

2.3.3,rabbitmq-delayed-message-exchangeimplementation

Disadvantages of using dead-letter queues for delayed messages:

  • If a queue is used to set the TTL of messages, when the gradient is very large, such as 1 minute, 2 minutes, 5 minutes, 10 minutes, 20 minutes, 30 minutes… … . Many switches and queues need to be created to route messages.
  • If you set the TTL for messages separately, you may block messages in the queue — the first message is not queued (not consumed), and subsequent messages cannot be delivered (for example, the first message expired with a TTL of 30 minutes and the second with a TTL of 10 minutes). After 10 minutes, the second message cannot be delivered because the first message has not been queued, even though the second message should be delivered).
  • There may be some time error.

In RabbitMQ 3.5.7: and later a plugin (Rabbitmq-delayed -message-exchange) is provided for delayed queuing (available on Linux and Windows). The plugin also relies on Erlang/OPT 18.0 and above.

Plugin source code address

Plug-in download address

2.3.3.1 Plug-in installation

1. Go to the plug-in directory

Whereis the rabbitmq CD/usr/lib/rabbitmq/lib/rabbitmq_server - 3.6.12 / pluginsCopy the code

2. Upload rabbitmq_delayed_message_exchange-3.8.0.ez to this plugin

3. Enable the plug-in

#Start the plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#Stop using the plug-in
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
Copy the code

4. Use plug-ins

Use the delayed- Messaging feature by declaring an Exchange of type X-delayed – Message. X-delayed -message is the type provided by the plugin, not rabbitMQ itself (distinct from Direct, topic, fanout, headers).

2.3.3.2 Use code examples

Example: Declare delayed Exchange

@Bean("delayExchange")
public TopicExchange exchange() {
    Map<String, Object> argss = new HashMap<String, Object>();
    argss.put("x-delayed-type", "direct");
    return new TopicExchange("DELAY_EXCHANGE", true, false, argss);
}
Copy the code

Producer: Specifies the X-delay parameter in the message property.

The sample

MessageProperties messageProperties = new MessageProperties();
// The delay interval, the target time minus the current time
messageProperties.setHeader("x-delay", delayTime.getTime() - now.getTime());
Message message = new Message(msg.getBytes(), messageProperties);

// Cannot be tested locally, messages must be sent to the server where the plug-in is installed
rabbitTemplate.send("DELAY_EXCHANGE"."#", message);
Copy the code

Ps: Besides being out of date, under what circumstances can a message become a dead-letter message?

  1. Message was blocked by the consumers and is not set to return to the queue (NACK | | Reject) && requeue = = false
  2. When the queue reaches its maximum length, exceeding Max Length or Max Length bytes, the first queued message is sent to the DLX.

2.4. Message storage restrictions

Scenario: If RabbitMQ produces MQ messages faster than it consumes them, a large amount of messages will accumulate, occupying system resources and degrading machine performance. We want to control the number of messages received by the server. How do we do that?

Flow control we can control from several aspects, one is the server side, one is the consumer side.

2.4.1 Server control

2.4.1.1. Queue length

Queues have two properties that control their length:

  • x-max-length: in the queueThe maximum number of messages to storeWhen the number exceeds this, the header’s messages will be discarded.
  • x-max-length-bytes: Stored in the queueMaximum message capacityIf the number of bytes exceeds this value,Team leader messages are discarded.

Note that setting the queue length is only meaningful in the case of message accumulation, and will delete the messages that are queued first, and does not truly implement server-side traffic limiting.

2.4.1.2. Memory control

RabbitMQ checks the machine’s physical memory at startup. By default, WHEN MQ consumes more than 40% of memory, MQ proactively throws a memory warning and blocks all Connections. The memory threshold can be adjusted by modifying the rabbitmq.config file, which defaults to 0.4, as shown below: Windows default configuration file advanced. Config, C:\Users\ username \AppData\RoamingRabbitMQ\

[{rabbit, [] {vm_memory_high_watermark, 0.4}}].Copy the code

If set to 0, all messages cannot be published.

Rabbitmqctl set_vm_memory_high_watermark 0.3Copy the code

2.4.1.3 Disk control

Related configuration parameters, reference documentation, www.rabbitmq.com/configure.h…

Another way is to control the release of messages via disk. When the available disk space is lower than the specified value (50MB by default), the flow control measure is triggered.

# Specify 30% of the disk to store messages
disk_free_limit.relative = 3.0
# Specify 2GB of disk to store messages
disk_free_limit.absolute = 2GB
Copy the code

In another case, although Broker messages can be stored, consumers cannot consume them under the push model, and traffic should be controlled in this case.

2.4.2 Control at the consumer end

www.rabbitmq.com/consumer-pr…

By default, RabbitMQ will send queued messages to consumers as quickly as possible without configuration. Because consumers cache messages locally, if the number of messages is too large, it may cause OOM or affect the normal running of other processes.

In the case that consumers’ ability to process messages is limited, for example, the number of consumers is too small, or the processing time of a single message is too long, if we want to stop pushing messages until a certain number of messages are consumed, we need to use traffic restriction measures on the consumer side.

You can set prefetch Count based on Consumer or channel, meaning the maximum number of unacked messages on the Consumer end. When messages above this value are not acknowledged, RabbitMQ stops sending new messages to the consumer.

channel.basicQos(2); If more than two messages are not ACK sent, the current consumer no longer accepts the queued message
channel.basicConsume(QUEUE_NAME, false, consumer); // Set this to manual submission
Copy the code

Code examples:

Start two consumers, one of which Consumer2 is slow to consume and has qos set to 2. At most two messages are sent to it at a time, and all other messages are received by Consumer1. It’s called the best of the best.

3. Use RabbitMQ

3.1. Springboot integrates RabbitMQ

Reference code: gitee.com/fanger8848/…

Maven rely on

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

RabbitConfig.java

@Configuration
public class RabbitConfig {

    // Two switches
    @Bean("topicExchange")
    public TopicExchange getTopicExchange(a){
        return new TopicExchange("TOPIC_EXCHANGE");
    }

    @Bean("fanoutExchange")
    public FanoutExchange getFanoutExchange(a){
        return  new FanoutExchange("FANOUT_EXCHANGE");
    }

    // Three queues
    @Bean("firstQueue")
    public Queue getFirstQueue(a){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl".6000);
        Queue queue = new Queue("FIRST_QUEUE".false.false.true, args);
        return queue;
    }

    @Bean("secondQueue")
    public Queue getSecondQueue(a){
        return new Queue("SECOND_QUEUE");
    }

    @Bean("thirdQueue")
    public Queue getThirdQueue(a){
        return new Queue("THIRD_QUEUE");
    }

    // Two bindings
    @Bean
    public Binding bindSecond(@Qualifier("secondQueue") Queue queue,@Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("#.gupao.#");
    }

    @Bean
    public Binding bindThird(@Qualifier("thirdQueue") Queue queue,@Qualifier("fanoutExchange") FanoutExchange exchange){
        returnBindingBuilder.bind(queue).to(exchange); }}Copy the code

consumers

@Component
@RabbitListener(queues = "FIRST_QUEUE")
public class FirstConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println(" first queue received msg : "+ msg); }}Copy the code

Message is sent

@Component
public class MyProvider {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(a){
        // Send four messages

        amqpTemplate.convertAndSend(""."FIRST_QUEUE"."-------- a direct msg");

        amqpTemplate.convertAndSend("TOPIC_EXCHANGE"."shanghai.teacher"."-------- a topic msg : shanghai.teacher");
        amqpTemplate.convertAndSend("TOPIC_EXCHANGE"."changsha.student"."-------- a topic msg : changsha.student");

        amqpTemplate.convertAndSend("FANOUT_EXCHANGE".""."-------- a fanout msg"); }}Copy the code

3.2 Springboot Parameter configuration

Springboot address for querying Springboot configurations docs. Spring. IO /spring-boot…

4. Feature summary

  • Support for multiple clients: Client implementations for major development languages (Python, Java, Ruby, PHP, C#, JavaScript, Go, Elixir, Objective-C, Swift, etc.).
  • Flexible routing: Flexible routing of messages through exchanges.
  • Rights management: Manages rights for users and VMS.
  • Plug-in system: support a variety of rich plug-in extensions, but also support custom plug-ins.
  • Integration with Spring: Spring encapsulates AMQP.
  • High reliability: RabbitMQ offers a variety of features that allow you to make trade-offs between reliability and performance, including persistence, send and reply, release confirmation, and high availability.
  • Clustering and scalability: Multiple nodes form a logical server to support load.
  • Ha queue: Replicates data in a queue through mirroring queues.