1 overview

1.1 meaning

MQ(Message Quene) : Translated as Message queue. Through the typical producer and consumer model, producers continuously produce messages to the Message queue, and consumers continuously fetch messages from the queue. Because message production and consumption are asynchronous, and only care about message sending and receiving, no intrusion of business logic, easy to achieve decoupling between systems.

1.2 features

Peak cutting and valley filling asynchronous decoupling

1.3 closer agreement

Producer, consumer, switch, queue, virtual host, server RabbitMQ is a message queue that strictly implements the AMQP protocol, which is the RabbitMQ architecture

2 use

Through the ARCHITECTURE of MQ, the producer sends messages to the Server’s switch, the switch sends messages to the queue through certain matching rules, and the queue sends messages to the consumer, so that a variety of different uses can be planned, called the message model

2.1 Direct Connection Mode

In the model above, there are the following concepts:

  • P: the producer, that is, the program to send the message
  • C: Consumer: The recipient of the message, waiting for the message to arrive.
  • Queue: Message queue, shown in red. Like a mailbox, it can cache messages; Producers post messages to them, and consumers retrieve messages from them.

Features: P and C directly connect to queues without switching

1. The producers

  // Create a connection factory
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  // Create channel
  Channel channel = connection.createChannel();
  // Parameter 1: persistent parameter 2: exclusive queue parameter 3: automatically delete parameter 4: other attributes
  channel.queueDeclare("hello".true.false.false.null);
  channel.basicPublish(""."hello".null."hello rabbitmq".getBytes());
  channel.close();
  connection.close();
Copy the code

2. Consumers

 // Create a connection factory
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare("hello".true.false.false.null);
  channel.basicConsume("hello".true.new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      System.out.println(newString(body)); }});Copy the code

3 Parameter Description

 channel.queueDeclare("hello".true.false.false.null);
  Parameters' 1 ': used to declare the queue corresponding to the channelParameters' 2 ': specifies whether to persist the queueParameters' 3 ': Specifies whether the queue is exclusiveParameters' 4 ': Specifies whether to automatically delete the queueParameters' 5 ': Additional configuration for queuesCopy the code

2.2 Task Mode

When message processing is time-consuming, messages may be produced faster than they are consumed. In the long run, messages pile up and can’t be processed in a timely manner. This is where you can use the Work model: multiple consumers are bound to a queue and collectively consume messages in the queue. Once the messages in the queue are consumed, they disappear, so the task is not repeated.

Features: Producers send data to a queue, and multiple consumers consume at the same time

1. The producers

channel.queueDeclare("hello".true.false.false.null);
for (int i = 0; i < 10; i++) {
  channel.basicPublish(""."hello".null, (i+"====>: I am the message").getBytes());
}
Copy the code

2. Consumer 1

channel.queueDeclare("hello".true.false.false.null);
channel.basicConsume("hello".true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1:"+newString(body)); }});Copy the code

3. Consumer 2

channel.queueDeclare("hello".true.false.false.null);
channel.basicConsume("hello".true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    try {
      Thread.sleep(1000);   // Process a message one second slower
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Consumer 2:"+newString(body)); }});Copy the code

Results 4.

The queue will be sent to each consumer in order, regardless of the consumer’s consumption speed, just to deliver

5. Automatic confirmation mechanism

The above scheme does not take into account the consumption speed of consumers, which is not reasonable. It can be optimized by setting the confirmation of consumption before receiving new tasks

channel.basicQos(1);Accept only one unconfirmed message at a time
// Parameter 2: Disables automatic confirmation
channel.basicConsume("hello".false.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1:"+new String(body));
    channel.basicAck(envelope.getDeliveryTag(),false);// Confirm the message manually}});Copy the code

2.3 Broadcast Mode

In task mode, a message can be sent to only one consumer, while in broadcast mode, messages can be sent to multiple queues through the switch

In broadcast mode, the message sending process looks like this:

  • You can have multiple consumers
  • Each consumer has its own queue.
  • Every queue has to be bound to Exchange
  • A message sent by a producer can only be sent to the switch. The switch decides which queue to send the message to, but the producer cannot decide.
  • The switch sends messages to all queues that are bound
  • All consumers in the queue get the message. Implement a message to be consumed by multiple consumers

1. The producers

// Declare a switch
channel.exchangeDeclare("logs"."fanout");// Broadcast a message to multiple consumers simultaneously
// Publish the message
channel.basicPublish("logs"."".null."hello".getBytes());
Copy the code

2. Consumers

// Bind the switch
channel.exchangeDeclare("logs"."fanout");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind temporary queues to Exchange
channel.queueBind(queue,"logs"."");
// Process the message
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1:"+newString(body)); }});Copy the code
// Bind the switch
channel.exchangeDeclare("logs"."fanout");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind temporary queues to Exchange
channel.queueBind(queue,"logs"."");
// Process the message
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 2:"+newString(body)); }});Copy the code
// Bind the switch
channel.exchangeDeclare("logs"."fanout");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind temporary queues to Exchange
channel.queueBind(queue,"logs"."");
// Process the message
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 3:"+newString(body)); }});Copy the code

Three temporary queues were created, bound to the same broadcast switch, and all three queues could receive messages

2.4 Route Matching

Features: Matches based on routes

Under the Direct model:

  • The binding of the queue to the switch cannot be arbitrary, but must be specifiedRoutingKey(Route Key)
  • The sender of a message must also specify the message when sending a message to ExchangeRoutingKey.
  • Exchange no longer delivers messages to each bound queue, but to each bound queueRouting KeyTo make a judgment, only queueRoutingkeyWith the message ofRouting keyThe message is received only when it is exactly the same

1. The producers

// Declare switch parameters 1: switch name parameter 2: switch type Command-based Routing key forwarding
channel.exchangeDeclare("logs_direct"."direct");
String key = "";
// Publish the message
channel.basicPublish("logs_direct",key,null, ("Specified route key"+key+"The news").getBytes());
Copy the code

2. Consumers

 // Declare a switch
channel.exchangeDeclare("logs_direct"."direct");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind queues and switches
channel.queueBind(queue,"logs_direct"."error");
channel.queueBind(queue,"logs_direct"."info");
channel.queueBind(queue,"logs_direct"."warn");

// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1:"+newString(body)); }});Copy the code
// Declare a switch
channel.exchangeDeclare("logs_direct"."direct");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind queues and switches
channel.queueBind(queue,"logs_direct"."error");
// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 2:"+newString(body)); }});Copy the code

Send a message with an error key, which can be received by both upper and lower consumers

Send a message whose key is info and, depending on the match, only the top consumer can consume it

2.4 Route Matching 2

The last route match was just a wildcard

* (star) can substitute for exactly one word. Hash can substitute for zero or more words. Match one or more words # such as: audit.# Match audit. Corporate or auditCopy the code

1. The producers

// Health switches and switch type Topics use dynamic routing (wildcard mode)
channel.exchangeDeclare("topics"."topic");
String routekey = "user.save";// Dynamic routing key
// Publish the message
channel.basicPublish("topics",routekey,null, ("This is the dynamic subscription model in routing,route key: ["+routekey+"]").getBytes());
Copy the code

2. Consumers

 // Declare a switch
channel.exchangeDeclare("topics"."topic");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind the queue to the switch and set to obtain the dynamic route in the switch
channel.queueBind(queue,"topics"."user.*");

// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1:"+newString(body)); }});Copy the code
// Declare a switch
channel.exchangeDeclare("topics"."topic");
// Create temporary queues
String queue = channel.queueDeclare().getQueue();
// Bind the queue to the switch and set to obtain the dynamic route in the switch
channel.queueBind(queue,"topics"."user.#");

// Consume messages
channel.basicConsume(queue,true.new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 2:"+newString(body)); }});Copy the code

Consumer 1 matches use.*

Consumer 2 matches uESR.#

The key of the sent data is user.sava so it all matches

3 integrated Springboot

3.0 Setting up the Initial Environment

1. Introduce dependencies
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
2. Configure the configuration file
spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 10.15. 09.
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems
Copy the code

RabbitTemplate is used to simplify operations and can be injected directly into a project

3.1 Use of the first Hello World model

Development producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testHello(a){
  rabbitTemplate.convertAndSend("hello"."hello world");
}
Copy the code
Develop consumers
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {

    @RabbitHandler
    public void receive1(String message){
        System.out.println("message = "+ message); }}Copy the code

3.2 Use of the second work model

Development producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testWork(a){
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("work"."hello work!"); }}Copy the code
Develop consumers
@Component
public class WorkCustomer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("work message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("work message2 = "+ message); }}Copy the code

Note: By default, Spring AMQP implementation works this way is fair scheduling, if you need to achieve more Work requires additional configuration

3.3 Fanout broadcast model

Development producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testFanout(a) throws InterruptedException {
  rabbitTemplate.convertAndSend("logs".""."This is a log broadcast");
}
Copy the code
Develop consumers
@Component
public class FanoutCustomer {

    @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(name="logs",type = "fanout") ))
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = @QueueBinding( value = @Queue, Exchange = @exchange (name="logs",type =" fanout"))
    public void receive2(String message){
        System.out.println("message2 = "+ message); }}Copy the code

3.4 Route Routing Model

Development producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testDirect(a){
  rabbitTemplate.convertAndSend("directs"."error"."Error log information");
}
Copy the code
Develop consumers
@Component
public class DirectCustomer {

    @RabbitListener(bindings ={ @QueueBinding( value = @Queue(), key={"info","error"}, exchange = @Exchange(type = "direct",name="directs") )})
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings ={ @QueueBinding( value = @Queue(), key={"error"}, exchange = @Exchange(type = "direct",name="directs") )})
    public void receive2(String message){
        System.out.println("message2 = "+ message); }}Copy the code

3.5 Topic Subscription Model (Dynamic Routing Model)

Development producer
@Autowired
private RabbitTemplate rabbitTemplate;

//topic
@Test
public void testTopic(a){
  rabbitTemplate.convertAndSend("topics"."user.save.findAll"."User.save.findall message");
}
Copy the code
Develop consumers
@Component
public class TopCustomer {
    @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.*"}, exchange = @Exchange(type = "topic",name = "topics") ) })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.#"}, exchange = @Exchange(type = "topic",name = "topics") ) })
    public void receive2(String message){
        System.out.println("message2 = "+ message); }}Copy the code

4 Application Scenarios

4.1 the asynchronous

Decoupling 4.2

4.3 peak elimination

5. The cluster

Clusters fall into two main types:

Normal clustering: Replicates some metadata and does not guarantee high availability

Mirror cluster: Replicates all data

5.1 Common Cluster

0. Plan the cluster

Node1:10.15.0.3 MQ1 Master Master node Node2:10.15.0.4 MQ2 REPL1 replica node Node3:10.15.0.5 MQ3 Repl2 replica nodeCopy the code

1. Clone the host name and IP address mapping of the three hosts

Vim /etc/hosts Added: 10.15.0.3 MQ1 10.15.0.4 MQ2 10.15.0.5 MQ3 node1: vim /etc/hostname Added: mq1 node2: Vim /etc/hostname Add: mq2 node3: vim /etc/hostname Add: mq3Copy the code

2. Install rabbitMQ on three machines and synchronize cookie files on node1:

scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/
Copy the code

3. Check whether the cookies are consistent:

node1: cat /var/lib/rabbitmq/.erlang.cookie 
node2: cat /var/lib/rabbitmq/.erlang.cookie 
node3: cat /var/lib/rabbitmq/.erlang.cookie 
Copy the code

4. Start rabbitMQ in the background. Run the following command on all nodes to access the management page:

rabbitmq-server -detached 
Copy the code

5. Run the following commands to join the cluster on node2 and node3:

1. Stop rabbitmqctl stop_app. 2. Add rabbitmqctl join_cluster rabbit@mq1 3. Start service rabbitmqctl start_appCopy the code

6. Check the cluster status and run the following command on any node:

rabbitmqctl cluster_status
Copy the code

7. If the following information is displayed, the cluster is successfully set up:

Cluster status of node rabbit@mq3 ...
[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
{cluster_name,<<"rabbit@mq1">>},
{partitions,[]},
{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]
Copy the code

8. Log in to the management page and the following information is displayed:

9. Create a queue on node1

10. View node2 and node3 nodes:

11. Shut down node1 and run the following command to view node2 and node3:

rabbitmqctl stop_app
Copy the code

5.2 Image Cluster

Can achieve high availability, and then add a rule on the web side can be specific baidu

6 Advanced Functions

6.1 Reliable delivery

The message delivery process is producer > RabbitMQ broker > Exchange > Queue > Consumer

Messages sent from the producer to the Exchange return an confirmCallback. Failure to deliver a message from Exchange –> Queue returns a returnCallback. We will use these two callbacks to control the reliable delivery of messages

1. Set ConnectionFactory to publisher- Confirm =”true”

Using rabbitTemplate setConfirmCallback set the callback function. The confirm method is called back when the message is sent to Exchange. If the ack value is true, the packet is successfully sent. If the ACK value is false, the packet fails to be sent.

2. Set Publisher -returns=”true” to ConnectionFactory to enable the return mode

Using rabbitTemplate setReturnCallback set back function, when the message from the exchange routing to the failure of the queue, if installed rabbitTemplate setMandatory (true) parameters, The message is returned to the producer. The callback function returnedMessage is executed.

6.2 Reliable consumption of messages

Ack means Acknowledge. Indicates the confirmation mode of the consumer after receiving the message.

There are three ways to confirm:

Automatic acknowledge: acknowledge=” None”

Manual acknowledgement: acknowledge=”manual”

Acknowledge anomalies with: acknowledge=”auto”, (this is cumbersome and does not explain)

Automatic acknowledgement means that when a message is received by a Consumer, it is automatically acknowledged and removed from RabbitMQ’s message cache. But in a real business process, it is likely that the message will be received, the business process will fail, and the message will be lost. If the manual confirmation mode is set, you need to call channel.basicack () after the service processing is successful to manually collect the message. If an exception occurs, call channel.basicnack () to make the message automatically resend.

6.3 Reliability Summary

Exchange is persisted

Queue is persisted

Message is persisted

The manufacturer confirms Confirm plus return

Consumer confirms Ack

The Broker high availability

6.4 a dead letter

Rabbitmq has only dead letter switches, which can be used to implement a dead letter queue

summary

  1. Dead letter switches and dead letter queues are the same as normal ones
  2. When a message becomes dead letter, if the queue is bound to a dead letter switch, the message is rerouted to the dead letter queue by the dead letter switch
  3. There are three ways a message can become a dead letter
    1. Queue message length reached limit;
    2. Consumers reject messages and do not return to the queue;
    3. The original queue has message expiration Settings, and the message expiration time is not consumed.

6.5 Delay Queue

The ability to make a purchase after a delay

This can be implemented through a dead-letter switch. When a message is stored for a certain period of time, it is thrown into the dead-letter and then consumed

6.6 Sequential Consumption

One queue to one consumer, natural implementation

If it’s RocketMQ, I’ll talk about it later

6.6 Distributed Transactions

Distributed transactions can be implemented through Rabbtimq + local message tables

Confirm + RETURN + ACK + high availability deployment can not achieve distributed transaction reliability?

So there’s a situation where you have a local transaction, and there’s no delivery to the message queue, and the machine goes down, and the local transaction is finished, and the message is not delivered, so you need a local message table, and you put the local transaction and the local message table in the same transaction, like Mysql, A thread is then started to periodically poll the local message table for undelivered messages

Reference:

B site above the rabbitMQ course md documentation, appropriately summarized