preface

With distributed messaging middleware, it is important to understand two basic concepts: what is a distributed system and what is middleware.

Distributed system

“A distributed system is one in which components located at networked computers communicate and coordinate their actions Only by Passing Messasges. “– Distributed Systems Concepts and Design

Two characteristics of distributed systems can be inferred from this explanation: components are distributed on a network computer and components coordinate their actions through messages

The middleware

Middleware is computer software that provides services to software applications beyond those available from the operating system. It can be described as “software glue”. Middleware makes it easier for software developers to Implement communication and input/output, so they can focus on the specific purpose of their application

Middleware is described as providing services for applications in addition to those provided by the operating system, simplifying the communication, input and output development of applications and enabling them to focus on their own business logic. The wikipedia definition of middleware is a bit confusing, but it can be understood from a “spatial” perspective, i.e. middleware is a “middle tier” component that acts as a bridge between the upper layer of applications and the lower layer of services (e.g. the upper layer of DB middleware is an application and the lower layer is a DB service). It is also a bridge between applications (such as distributed service components).

Distributed messaging middleware

Message-oriented Middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages Between Distributed systems. “– Wikipedia

Message-oriented middleware is defined by Wikipedia as the hardware or software infrastructure (certainly software for the scope of our discussion here) that supports sending and receiving messages in distributed systems.

So distributed message middleware actually means that message middleware itself is also a distributed system.

What can messaging middleware do?

Any middleware is bound to solve a problem in a specific domain, messaging middleware is to solve the problem of messaging between distributed systems. Messaging is an inevitable problem for distributed systems.

The application scenarios of message-oriented middleware are summarized as follows:

  • Business decoupling: The trading system does not need to be aware of the SMS notification service, only needs to publish messages
  • Peak clipping: for example, the throughput capacity of the upstream system is higher than that of the downstream system, and the downstream system may be overwhelmed at the peak of the flow. The message middleware can accumulate messages at the peak, and the downstream system can slowly consume messages to solve the problem of the peak of the flow
  • Event-driven: Business can be driven from system to system in the form of message passing and processed in a streaming model

What does distributed messaging middleware look like?

An abstract understanding of distributed messaging middleware might look something like this:

  • There is an SDK that provides an interface for the business system to send and consume messages
  • A number of Server nodes are used to receive and store messages and send them to downstream systems for consumption when appropriate

Rabbitmq, RocketMq and Kafka are the three most popular middleware middleware in the world. I’m sure there’s not enough space for this series, so I’m going to have to pick the most important ones, but it will give those of you who don’t know how to use them.

Full version of messaging middleware learning materials and notes I personally compiled

You can directly click on the blue word to receive

All right, without further ado, here we go!


In addition to RabbitMQ being very fast, RabbitMQ has these features:

  • Open source, excellent performance, stability assurance
  • Provide reliability message delivery mode, return mode
  • Perfect integration with Spring AMQP, rich API
  • Rich cluster mode, expression configuration, HA mode, mirror queue model
  • Ensure high reliability and availability without data loss

Rabbitmq message queue application

1. RabbitMQ introduction

RabbitMQ is an open source message queue component developed by Erlang based on AMQP (Message Queue Protocol). Is an excellent message queue component, he consists of two parts: the server and the client, the client supports a variety of language drivers, such as:.NET, JAVA, Erlang and so on. RabbitMQ performance comparisons with other message queue components are not covered here, there is a wealth of information available online.


2. Introduction to RabbitMQ

The RabbitMQ middleware is divided into a RabbitMQ Server and a RabbitMQ Client. The Server can be understood as a proxy Consumer of messages, while the Client is divided into a message Producer and a message Consumer.

2.1 Message Producers: Producer produces messages and transmits them over TCP to the RabbitMQ Server by establishing a Connection and Channel.

2.2 RabbitMQ Server: Handles message routing, distribution, enqueueing, caching, and outbound. It consists of three parts: Exchange, RoutingKey, and Queue.

(1) Exchange: used to receive messages sent by message producers. There are three types of Exchange: Direct, FANout and Topic. Different types of Exchange realize different routing algorithms.

A. Direct exchange: Directly pushes the message matched with the routing key to the corresponding queue. When creating A queue, A routing key with the same name is created by default.

B. Fanout Exchange: is a broadcast mode that ignores the routingkey rules.

C. Topic exchange: application topic. Routes are routed based on key patterns. For example, if the value is ABC, routes are pushed to all queues corresponding to ABC. If it is ABC.# push to the queue corresponding to ABC. Xx. one and ABC.

(2) A RoutingKey is a rule for RabbitMQ to distribute routes to queues. It is used by Exchange to push messages to queues.

(3) Queue: is a message Queue. Multiple queues can be defined as required, and Queue attributes can be set, such as message removal, message cache, callback mechanism and other Settings, to achieve communication with Consumer.

2.3 Message Consumer: mainly responsible for consuming the message Queue, also based on TCP protocol, through the establishment of Connection and Channel and Queue transmission message, a message can be consumed by multiple consumers;

2.4 Key nouns: Connection, Channel, Binging, etc.

(1) Connection: establishes the Connection between the client and the server.

(2) Channel: it is to establish communication channels based on Connection. Because the communication cost and performance consumption of TCP protocol established in each Connection are large, multiple channels should be used to reduce the cost and improve performance after a Connection is established.

– If the communication is similar to the following, exchange and queue are tied together to define routingKey policies.


3. Install and deploy RabbitMQ

After a brief introduction to RabbitMQ, let’s put the message queue service into practice. The RabbitMQ server can run on Windows, Linux and Mac platforms, and the client supports multiple implementations. This time we will build on Linux CentOS7 platform.

3.1 Installing the Erlang runtime environment

Since RabbitMQ is developed using Erlang technology, the Erlang runtime must be installed before the message queue service can be installed.

(1) Enable the system to access the public network and set the default gateway

'route add' 'default' 'gw 192.168.1.1Copy the code

(2) Install Erlang

`su -c ``'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm'` `sudo yum install  erlang`Copy the code

(3) Check whether Erlang is installed successfully

`erl`
Copy the code

(4) The installation is successful

    

3.2 Installing the RabbitMQ Server

(1) Download the installation package

` wget HTTP: ` ` ` / / www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpmCopy the code

Install and configure RabbitMQ server, version 3.6.0:

`rpm --import https:``//www.rabbitmq.com/rabbitmq-signing-key-public.asc` `yum install The rabbitmq server - 3.6.0-1. Noarch. RPM `Copy the code

(3) Enable the Web management plug-in

`rabbitmq-plugins enable rabbitmq_management`
Copy the code

(4) Start RabbitMQ

`chkconfig rabbitmq-server ``on`
`/sbin/service rabbitmq-server start`
Copy the code

(5) Firewall opening port

`# firewall-cmd --permanent --zone=public --add-port=5672/tcp`
`# firewall-cmd --permanent --zone=public --add-port=15672/tcp`
`# firewall-cmd --reload`
Copy the code

(6) RabbitMQ creates a guest account by default, which can only be used by the administrator of the localhost login page.

http://localhost:15672/

`rabbitmqctl add_user test test` `rabbitmqctl set_user_tags test administrator<br>rabbitmqctl set_permissions -p / test ` ` ". * "` `". * "` `". * "`Copy the code

    

RabbitMQ administrator page.


4. RabbitMQ applications

This section describes how logs produced by web applications are sent over RabbitMQ and the log service receives messages from the message queue.

    

This system uses the official Client, through nuget reference.

  

4.1 Web Application Production Service Logs

[HttpPost] public ActionResult Create() { this.HttpContext.Session["mysession"] = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"); var txt = Request.Form["txtSite"].ToString(); RabbitMQHelper helper = new RabbitMQHelper(); Helper.SendMsg(TXT + ", time :" + datetime.now. ToString(" YYYY-MM-DD hh: MM :ss")); return RedirectToAction("Index"); } ` `}Copy the code

  

Page rendering.

4.2 The Log Service Receives log Messages

Develop a log processing service based on window Form and print the received messages.

private void btnReceive_Click(object sender, EventArgs e) { isConnected = true; using (var channel = connection.CreateModel()) { channel.QueueDeclare("MyLog", false, false, false, null); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume("MyLog", true, consumer); while (isConnected) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); txtMsg.Text += message + "\r\n"; }}}Copy the code

4.3 RabbitMQ Page Monitoring

RabbitMQ comes with a page monitoring tool to monitor MQ:

  

The full version of messaging middleware learning materials and my personal notes are available by clicking on the blue text

Rabbitmq message confirmation mechanism

1. Confirm message confirmation mechanism at the production end

Acknowledgement of a message means that after a producer sends a message, if the Broker receives the message, it will send us a reply. The producer receives and responds to determine whether a message is properly sent to the Broker. This method is also the core guarantee for reliable delivery of messages.

Confirm Confirm mechanism flow chart

2. How to implement the Confirm message?

  • Step 1: Enable confirmation mode on channel:channel.confirmSelect()
  • Step 2: Add listener to channel:channel.addConfirmListener(ConfirmListener listener);, listen to the return results of success and failure, according to the specific results of the message resend, or log and other subsequent processing!
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class ConfirmProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new  ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "item.update"; ConfirmSelect (); // Confirm confirmmode channel.confirmSelect(); // Send final long start = system.currentTimemillis (); for (int i = 0; i < 5 ; i++) { String msg = "this is confirm msg "; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); System.out.println("Send message : " + msg); } // Add a confirmation listener, so you don't close the connection, In order to ensure that can receive. Listening to the news channel addConfirmListener (new ConfirmListener () {/ * * * returns the success callback function * / public void handleAck (long deliveryTag, boolean multiple) throws IOException { System.out.println("succuss ack"); System.out.println(multiple); System.out.println(" time: "+ (system.currentTimemillis () -start) + "ms"); } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.printf("defeat ack"); System.out.println(" time: "+ (system.currentTimemillis () -start) + "ms"); }}); }}Copy the code
`import com.rabbitmq.client.*; import java.io.IOException; public class ConfirmConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new  ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String queueName = "test_confirm_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); // Bind channel.queuebind (queueName, exchangeName, routingKey); Consumer = new DefaultConsumer(channel) {@override public void handleDelivery(String consumerTag,  Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }}; BasicConsume (queueName, true, consumer); }}Copy the code

We will focus here only on the production side of the output message

Send message : this is confirm msg Send message : this is confirm msg Send message : this is confirm msg Send message : This is confirm MSG Send message: this is confirm MSG succuss ack true duration: 3ms succuss ack true duration: 4msCopy the code

Matters needing attention

  • We use asynchronous confirm mode: a callback method is provided, and the Client will call back after the server confirms one or more messages. In addition, there are single confirm mode and batch confirm mode. Since they are rarely used in real scenarios, we will not introduce them here. If you are interested, you can refer to the official documents directly.

  • When we run the production side, we find that the results are different each time. There are many different scenarios because brokers are optimized, sometimes in batches, sometimes in batches.

'succuss ACK True: 3ms SUCCuss ACK false: 4ms or succuss ack true: 3ms'Copy the code

3. Return message mechanism

  • A Return Listener is used to process a non-routable message!

  • The message producer, by specifying an Exchange and a Routingkey, sends the message to a queue, and our consumers listen to the queue to consume it!

  • In some cases, if the exchange does not exist or the specified routing key is not available, we need to use a Return Listener to listen for unreachable messages.

  • Mandatory: If true, a listener receives a message that is unreachable and processes it. If false, the message is automatically deleted by the broker!

Flow chart of Return message mechanism

Return message example

  • First we need to send three messages and deliberately set the routing Key of message 0 to be wrong so that it cannot be routed to the consumer.

  • The mandatory value is true. Messages that are unreachable are monitored and not deleted automatically. Channel.basicpublish (exchangeName, errRoutingKey, true,null, msg.getBytes());

  • Finally add listening to listen to the news channel cannot be routed to the consumption. The addReturnListener ReturnListener (r))

`import com.rabbitmq.client.*; import java.io.IOException; public class ReturnListeningProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "item.update"; String errRoutingKey = "error.update"; ConfirmSelect (); // Confirm confirmmode channel.confirmSelect(); // send for (int I = 0; i < 3 ; I++) {String MSG = "this is return -- listening MSG "; //@param Mandatory is set to true. Messages that are unreachable are listened for. If (I == 0) {channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes()); } else { channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes()); } System.out.println("Send message : " + msg); } // Add a confirmation listener, so you don't close the connection, In order to ensure that can receive. Listening to the news channel addConfirmListener (new ConfirmListener () {/ * * * returns the success callback function * / public void handleAck (long deliveryTag, boolean multiple) throws IOException { System.out.println("succuss ack"); } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.printf("defeat ack"); }}); / / add a return to monitor channel. AddReturnListener (new ReturnListener () {public void handleReturn (int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("return relyCode: " + replyCode); System.out.println("return replyText: " + replyText); System.out.println("return exchange: " + exchange); System.out.println("return routingKey: " + routingKey); System.out.println("return properties: " + properties); System.out.println("return body: " + new String(body)); }}); }}Copy the code
`import com.rabbitmq.client.*; import java.io.IOException; public class ReturnListeningConsumer { public static void main(String[] args) throws Exception { //1\. Create a ConnectionFactory and set ConnectionFactory Factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection Connection = factory.newConnection(); //2\. //3\. CreateChannel Channel Channel = connection.createchannel (); //4\. Declare String exchangeName = "test_return_exchange"; String queueName = "test_return_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); // Bind channel.queuebind (queueName, exchangeName, routingKey); Consumer Consumer = new DefaultConsumer(channel) {@override public void handleDelivery(String) consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }}; BasicConsume (queueName, true, consumer); }}Copy the code

We focus only on the production-side results, and the consumer side receives only two messages.

'Send message: this is return -- listening MSG Send message: This is return -- listening MSG Send message: This is return -- listening MSG return relyCode: 312 return replyText: NO_ROUTE return exchange: test_return_exchange return routingKey: error.update return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, App-id =null, cluster-id=null) return body: This is return -- listening MSG succuss ACK succuss ACKCopy the code

4. Ack and Nack mechanisms on the consumer side

Consumer end for consumption, if due to abnormal business we can log the record, and then compensation! If the server is down and other serious problems, then we need to manually ACK to ensure the consumer end consumption success! The consumer goes back to the queue to pass the message back to the Broker if it has not processed successfully! In practice, we turn off requeue, which is set to False.

Refer to the API

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicAck(long deliveryTag, boolean multiple) throws IOException;

How to set up manual Ack, Nack, and requeue

  • We start by sending five messages, with each message’s corresponding loop subscript I in the message’s properties as a marker that we can identify later in the callback method.

  • Second, we consume ·channel.basicConsume(queueName, false, consumer); The autoAck property is set to false, and if set to true, five messages will normally be output.

  • We used Thread.sleep(2000) to delay for one second to see the results. BasicNack (envelope. GetDeliveryTag (), false, true); Set the num 0 message to NACK, which means consuming failed, and set the Requeue attribute to true, which means consuming failed messages back to the end of the queue.

`import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; public class AckAndNackProducer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String routingKey = "item.update"; String msg = "this is ack msg"; for (int i = 0; i < 5; i++) { Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num" ,i); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .headers(headers) .build(); String tem = msg + ":" + i; channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes()); System.out.println("Send message : " + msg); } channel.close(); connection.close(); }}Copy the code
import com.rabbitmq.client.*; import java.io.IOException; public class AckAndNackConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(3000); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "item.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, false, false, false, null); // Bind channel.queuebind (queueName, exchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if ((Integer) properties.getHeaders().get("num") == 0) { channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); }}}; BasicConsume (queueName, false, consumer); }}Copy the code

We only care about the output of the consumption side here, and we can see that the 0 consumption fails and goes back to the end of the queue to consume.

[x] Received 'this is ack msg:1' [x] Received 'this is ack msg:2' [x] Received 'this is ack msg:3' [x] Received 'this is  ack msg:4' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0' [x] Received 'this is ack msg:0'Copy the code

3. Rabbitmq mirror queue

1. Set the mirror queue

To configure a mirror queue, run the following command to add policy:

rabbitmqctl  set_policy  [-p Vhost]  Name  Pattern  Definition  [Priority]

-p Vhost: Specifies the queue of the specified Vhost. This parameter is optional

Name: indicates the Name of the policy

Pattern: Matching Pattern of queue (regular expression)

Definition: Mirror Definition, which includes three parts: ha-mode, ha-params, and ha-sync-mode

Ha-mode: specifies the mode of the mirror queue. The valid value is “all”, “exactly”, and “Nodes”

  • All indicates mirroring on all nodes in the cluster
- Exactly Indicates that the data is mirrored on a specified number of nodes. The number of nodes is specified by ha-paramsCopy the code
  • Nodes indicates that the mirror is performed on a specified node. The node name is specified by using ha-params

Ha-params: specifies the parameters required by the ha-mode mode

Ha-sync-mode: indicates the synchronization mode of messages in the mirror queue. The valid value is automatic, manually

Priority: Specifies the Priority of the policy

For example, to mirror all queues whose queue names start with Hello and mirror them on two nodes in the cluster, run the following command to set policy:

rabbitmqctl  set_policy  hello-ha  "^hello"  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
Copy the code

2, the approximate implementation of the mirror queue

2.1 Overview

Generally, queues consist of two parts: one is amqqueue_process, which is responsible for protocol-related message processing, that is, receiving messages from producers, delivering messages to consumers, processing messages confirm, acknowledge, and so on. The other part is backing_queue, which provides an interface for amqqueue_process to call, store messages, and possibly persist them.

The mirrored queue also consists of these two parts,

  • Amqqueue_process still handles protocol-related message processing
  • Backing_queue is a special backing_queue composed of master nodes and slave nodes
  • Both master and slave nodes consist of a set of processes, one responsible for broadcasting messages to the GM, and the other responsible for callback to broadcast messages received by the GM.
  • The master node is a coordinator
  • On the slave node is mirror_queue_slave. Mirror_queue_slave contains the normal backing_queue for storing messages
  • Backing_queue in master is contained in mirror_queue_master and is called by amqqueue_process.

Note: Messages are published and consumed through the master node. When the master node processes the message, it broadcasts the message processing action to all slave nodes through the GM. After receiving the message, the GM of the slave node sends the message to mirror_queue_slave for actual processing through the callback.

2.2 gm (Guaranteed Multicast)

In the traditional master-slave replication mode, the master node sends messages to all slave nodes to be replicated. During the replication process, if a slave node is abnormal, the master node takes appropriate actions. If the fault occurs on the master node, the slave nodes may communicate with each other to decide whether to continue the replication. Of course, logging is inevitable throughout the process in order to handle exceptions.

Rabbitmq does not use this method. Instead, all nodes form a circular linked list. Each node monitors the nodes on its left and right, and when a new node is added, its neighbours ensure that the message is copied to the new node. When a node fails, neighboring nodes take over to ensure that the broadcast message is copied to all nodes.

The GMS on the master and slave nodes form a group and the group’s information is recorded in Mnesia. Different mirror queues form different groups.

After the message is sent from the GM corresponding to the master node, it is transmitted to all nodes in turn along the linked list. Since all nodes form a circular linked list, the GM corresponding to the master node will finally receive the message sent by itself. At this time, the master node knows that the message has been copied to all slave nodes.

2.3 Important table structures

Rabbit_queue Records information about queues:

-record(amqQueue, {name, %% Queue name durable, %% Whether the queue is durable Auto_DELETE, %% Whether the queue is automatically deleted, %% Whether the queue is exclusive mode arguments, %% queue creation parameters PID, %%amqqueue_process process PID slave_pids, %% MIRRor_queue_slave Process PID set sync_slave_pids, %% Synchronized PID set policy. %% Queue-related policy %% is set by set_policy, undefined gm_pids, %%{gm,mirror_queue_coordinator},{gm,mirror_queue_slave} process PID collection decorator %%}).Copy the code

Note: Slave_pids storage is sorted by the time the slaves joined, so that if the master fails, the “oldest” slave is promoted to the new master.

The gm_group table records information about groups formed by gm:

-record(gm_group, {name, %%group name, consistent with queue name version, %%group version number, new node/node failure increment members, %%group member list, Sort by linked list of nodes}).Copy the code

3. Some details about mirroring queues

3.1 Adding a Node

The slave node first obtains the information of all members of the corresponding group from gm_group, and then randomly selects a node and sends a request to the node. After receiving the request, the node updates the information corresponding to gm_group and notifies the left and right nodes to update the neighbor information (adjust the monitoring of the left and right nodes) and the message being broadcast. The node is successfully added to the group. After receiving the rabbit_queue response, the rabbit_queue will be updated and synchronized as required.

3.2 Broadcast of messages

Messages are sent from the master node along the list of nodes. During this period, all slave nodes cache the message. When the master node receives the message, it broadcasts an ACK message again. The ACK message also passes through all slave nodes along the node list to inform them that the cached message can be cleared. When the ACK message returns to the master node, the lifetime of the broadcast message ends.

A simple diagram is shown below. Node A is the master node and broadcasts A message with the content “test”. 1″ : indicates the first broadcast message. Id =A” indicates that node A sends the message. On the right is the status information recorded by the slave node.

Why do all nodes need to cache a published message?

The master sends messages through all slave nodes in sequence. At any time during this period, a node may fail, and the neighboring nodes may need to send messages to the new node. For example, in the circular linked list formed by A->B->C->D->A, A is the master node, and the broadcast message is sent to node B, which then sends the message to node C. If node C receives the message from node B but does not send the message to node D, node B needs to send the message to node D again after node C becomes invalid. Similarly, if E is added to nodes B and C after node B sends the message to node C, node B needs to send the message to node E.

Gm status record:

-record(state, {self, %%gm ID left, %% right, %% right, %% group_name, %%group name with queue name module, Rabbit_mirror_queue_slave or rabbit_mirror_queue_coordinator view, %%group member list view information %% records the ID of the member and the left and right neighbor nodes of each member. Pub_count, %% Count of the current published messages members_state, [#member{}] callback_args, %% Parameters of the callback function %%rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator processes PID, %% Confirm list broadcast_buffer, %% broadcast_timer %% broadcast_timer %% broadcast_timer %% broadcast_timer %% broadcast_timer %% broadcast_timer %% broadcast_timer %% txn_executor}). -record(member, {pending_ack, %% last_pub, %% Last published message count last_ACK %% Last acknowledged message count}).Copy the code

3.3 Node Failure

When a slave node fails, it is only sensed by its neighbors, and the slave nodes reset the information about their neighbors and update the rabbit_queue and gm_group records. If the master node fails and the slave node with the oldest status is promoted to the master node, the slave node creates a new coordinator and tells the GM to change the coordinator to a coordinator. The original MIRRor_queue_slave acts as an AMqqueue_process to handle messages published by producers, deliver messages to consumers, and so on.

As mentioned above, if the failure of the slave node is detected by the neighboring nodes, is the failure of the master node also detected by the neighboring nodes? If this is the case, how can we tell the “oldest” node to be promoted to the new master node if the neighboring node is not the “oldest” node?

In fact, when all slave nodes join the group, the MIRRor_queue_slave process monitors the amqqueue_process (or mirror_queue_slave) of the master node. If the master node fails, Mirror_queue_slave senses this and broadcasts through the GM so that all nodes eventually know that the master node has failed. Of course, only the “oldest” nodes will promote themselves to the new master.

In addition, when the slave is promoted to master, mirror_queue_slave has an internal “shift”, That is, messages that need to be processed by handLE_CALL/handLE_INFO/handLE_CAST interfaces called back to mirror_queue_slave. Call the handLE_CALL/handLE_info/handLE_cast interface of amqqueue_process. The mirror_queue_slave process acts as amqqueue_process to complete the processing of protocol-related messages.

rabbit_mirror_queue_slave.erl handle_call({gm_deaths,LiveGMPids},From, State = #state{q = Q = #amqqueue{name=QName,pid=MPid}})-> Self = self(), case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of {ok,Pid,DeadPids} -> case Pid of MPid -> %% master hasn't changed gen_server2:reply(From, ok), noreply(State); Self -> %% we've become master QueueState = promote_me(From,State), {become, Rabbit_amqqueue_process, QueueState, hibernate}; . gen_server2.erl handle_common_reply(Reply,Msg,GS2State = #gs2_state{name=Name, debug=Debug})-> case Reply of ... {become, Mod, NState, Time1} -> Debug1=common_become(Name,Mod,NState,Debug), loop(find_prioritisers( GS2State#gs2_state{mod=Mod, state=NState, time=Time1, debug=Debug1})); . handle_msg({'gen_call',From,Msg}, GS2State=#gs2_state{mod=Mod, state=State, name=Name, debug=Debug}) -> case catch Mod:handle_call(Msg, From, State) of ... handle_msg(Msg,GS2State=#gs2_state{mod=Mod,state=State})-> Reply = (catch dispatch(Msg,Mod,State)), handle_common_reply(Reply, Msg, GS2State). dispatch({'$gen_cast',Msg},Mod,State)-> Mod:handle_cast(Msg, State); dispatch(Info, Mod, State)-> Mod:handle_info(Info,State).Copy the code

4. Message synchronization

What is the use of ha-sync-mode when configuring mirroring queues?

After a new node is added to the group, it can obtain at most the messages being broadcast from the node on the left, but cannot obtain the messages that have been broadcast before joining the group. If the master node fails and the new node becomes the new master, all messages broadcast before joining the group will be lost.

Note: The messages here specifically refer to the messages that have been published and copied to all slave nodes before the new nodes are added, and these messages have not been consumed or confirmed by consumers. If all broadcast messages are consumed and confirmed by consumers before a new node is added, the master node deletes the message and notifies the slave node to complete the corresponding action. This is equivalent to not publishing any information before a new node is added.

The solution to this problem is to synchronize messages to the new slave nodes. When ha-sync-mode is set to automatic, messages are automatically synchronized when new nodes join the group. If manually is used, you need to perform manual synchronization


I will write two more articles on RocketMq and Kafka for those who are interested in Rabbitmq.

Full version of messaging middleware learning materials and my personal collation of notes directly click on the blue text to receive

It would be nice if I could just like it, don’t you think


end