Why write this article

  • Open interview demand is higher and higher, now looked at several BOSS hiring Node. Js the whole development of the stack, which has a “understanding of the message queue, and applied in the project”, meowed
  • Backend developers probably know about message queues, but some front-end developers may not know as much, but you might be wonderingGrab tickets, goods kill in secondsHow to implement such functions, in fact, it is not so high, see the message queue.

Article Maps (you can learn)

Koala is dedicated to sharing the complete Node.js technology stack, from JavaScript to Node.js, to back-end database. Wish you become an excellent senior Node.js engineer. [Programmer growth refers to north] Author, Github blog open source project github.com/koala-codin…

What is a message queue

A message queue is a container that holds messages during their transmission.

Personal understanding: I split it into two word messages and queues. When a large number of clients simultaneously generate a large number of network requests (messages), there is certainly a limit to what a server can handle. It would be nice to have a container to queue the messages first. Fortunately, we have a data structure called a queue, which queues the messages to our server through a container with queue properties (first-in, first-out). This great container is the message queue

When there is a message queue, the producer and the consumer are two essential concepts. The above understanding is that there are many producers for one consumer. The production-consumption model will also be mentioned several times in subsequent articles.

Message queue advantage

  • The application of decoupling

    Message queue can make consumers and producers directly non-interference, non-influence each other, only need to send messages to the queue, and can be independently extended or modified on both sides of the process, as long as they can ensure that they comply with the same interface convention, can be achieved with Node.js producer, phython consumer implementation.

  • Flexibility and peak handling capability

    When there is a sudden increase in the number of client visits to the server, the access to the server has exceeded the maximum peak that the service can handle, and even lead to the collapse of the server timeout load, the use of message queue can solve this problem, by controlling the processing speed of consumers and the number of producers can enter the message queue to avoid the peak problem

  • Order to ensure

    Message queues can control the order in which data is processed because message queues themselves use a data structure called FIFO(Advanced selection). In some scenarios, the order in which data is processed is important, such as the order in which goods are placed.

  • Asynchronous communication

    Some messages in the message queue do not need to be processed immediately. The message queue provides an asynchronous processing mechanism. You can put the messages in the queue and process them when necessary, or process them asynchronously and slowly.

  • scalability

    As mentioned earlier, message queues can be decoupled. If we want to increase the frequency of incoming and outgoing messages, we can simply change the configuration of message queues without changing anything in the code. For example, we want to limit the number of messages sent to consumers at a time.

Advantages must have its realistic application scenarios, and the following article will talk about their corresponding application scenarios for advantages.

Description of message queue types

Introduce several mainstream message queues in the current market (extracurricular knowledge, can be ignored)

  • Kafka: is an open source stream processing platform developed by the Apache Software Foundation. Written in Scala and Java, it is a high-throughput distributed publish and subscribe messaging system that supports a single machine with millions of concurrent messages per second. In addition, Kafka is focused on logging and so on, because Kafka was designed for thatProcessing logTheta, you can view it as aLogging (messaging) systemAn important component, very targeted. Version 0.8 began to support replication, not transactions, so there are no strict requirements for message duplication, loss, or error.
  • RocketMQ: Alibaba’s open source messaging middleware is a low latency, high reliability, scalable, and easy to use messaging middleware that originated in Kafka. The biggest problem is that the commercial version charges, and some functions are not open.
  • RabbitMQ: Developed an open source message queue system based on the AMQP protocol by Erlang (with the same low latency as native sockets). It can ensure the reliability, stability and security of the message.High concurrency

RabbitMQ, of course, is the highest due to the naturally distributed nature of the high concurrency and availability of The Erlang language.

Note: This article mainly to RabbitMQ explanation, more common. In my opinion, all the functions that the message queue middleware can achieve can also be realized through Redis.

Introduction to Message queues (simple application of message queues in Node.js)

Rabbitmq basic installation

Mac version installed

Directly through HomeBrew installation, execute the following command

brew install rabbitmq
Copy the code

Start the rabbitmq

In the installation directory $/ usr/local/Cellar/rabbitmq / 3.7.8 launch $sbin/the rabbitmq server. -Copy the code

Enter http://localhost:15672/#/ default user name and password guest

Basic schematic diagram after installation

Visual interface module functions:


Other system installation please search online

Port description

5672: default communication port 15672: default management console port 25672: cluster communication port Note: If RabbitMQ is successfully installed on alicloud ECS server, the external network cannot access the ECS because there is no open port solution for the security group problem

Basic commands for installing Rabbitmq

The following lists some common operation commands on terminals

  • Whereis RabbitMQ: Displays the installation location of RabbitMQ
  • Rabbitmqctl start_app: Starts the application
  • Whereis Erlang: Displays the installation location of Erlang
  • Rabbitmqctl start_app: Starts the application
  • Rabbitmqctl stop_app: Stops the application
  • Rabbitmqctl status: node status
  • Rabbitmqctl add_user username password: add the user
  • Rabbitmqctl list_users: lists all users
  • Rabbitmqctl delete_user username: deletes the user
  • Rabbitmqctl add_vhost vhostPath: Creates a virtual host
  • Rabbitmqctl list_vhosts: Lists all virtual hosts
  • Rabbitmqctl list_queues: View all queues
  • Rabbitmqctl -p vhostPath purge_queue blue: clears messages from the queue

Rabbitmqctl sbin rabbitmqctl sbin rabbitmqctl sbin rabbitmqctl

Node.js implements a simple HelloWorld message queue

Draw a basic diagram, an image of the HelloWorld message queue, and include the following concepts.


A couple of concepts before we look at this code

  • Producer: Produces messages
  • Consumer: those who receive messages
  • Channel: After a connection is established, a channel is obtained
  • Exchange: a switch, where messages need to be sent first to an Exchange switch, and where messages are stored for the first step (there are many types of switches, more on this later).
  • Message queue: The place where messages are stored at the moment before they arrive at the consumer, to which the Exchange will deliver them
  • Ack receipt: Acknowledgement that a message has been consumed

Amqplib module

An NPM module, AMqplib, is recommended.

Making: github.com/squaremo/am…

$ npm install amqplib
Copy the code

Producer code product.js

const amqp =require('amqplib');

async function  product(params) {
    // 1. Create link objects
    const connect =await amqp.connect('amqp://localhost:5672');
     // 1. Create link objects
     const connection = await amqp.connect('amqp://localhost:5672');

     // 2. Obtain the channel
     const channel = await connection.createChannel();
 
     // 3. Declare parameters
     const routingKey = 'helloKoalaQueue';
     const msg = 'hello koala';
 
     for (let i=0; i<10000; i++) {
         // 4. Send messages
         await channel.publish(' ', routingKey, Buffer.from(`${msg}${i}Message `));
     }
 
     // close the channel
     await channel.close();
     6. Close the connection
     await connect.close();
}
product();
Copy the code

Producer code interpretation and running results

Execution node. JsCopy the code

The basic process has been explained in the code comments, but I had some questions when I first read it, and I think many friends will have some questions, let me explain:

  • Question 1

    The producer must specify an exchange when sending a message. If exchange is not specified (null), it will default to the AMQP default switch. The AMQP default routing rule matches routes based on whether there is a queue with the same name on the routingKey and MQ. This code is the default specified switch. For details on different types of switches, see the following section.

  • Question 2

    After the producer sends the message, the message is sent to the Exchange, but is the queue created?

    Answer: In the code we declare that the route is a routingKey, but it does not create a helloKoalaQueue message queue, the message will only be sent to the exchange switch. A look at the queue screenshot after running the code proves this:

  • Explanation 1

    After the producer sends the message, pay attention to closing the channel and connection. Once the message is successfully sent, the connection can be closed. The consumer can fetch the message in any language, which demonstrates the excellent decoupling characteristics of message queues

  • Instruction 2

    If the Node product.js producer code can be executed multiple times, messages will accumulate in the Exchange switch without being overwritten. If the consumer has been executed and the corresponding message queue has been identified, messages will be sent from the Exchange switch to the message queue and stored in the message queue for consumption by the consumer

The consumer code consumer.js

// Build consumers
const amqp = require('amqplib');

async function consumer() {
    // 1. Create link objects
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. Obtain the channel
    const channel = await connection.createChannel();

    // 3. Declare parameters
    const queueName = 'helloKoalaQueue';
  
    // 4. Declare queues. The switch defaults to AMQP default
    await channel.assertQueue(queueName);

    / / 5. Consumption
    await channel.consume(queueName, msg= > {
        console.log('Consumer:, msg.content.toString());
        channel.ack(msg);
    });
}
consumer();
Copy the code

Producer code interpretation and running results

Perform the node consumer. JsCopy the code
  • The execution result is displayed

  • Explanation 1

    I changed the queue name in the code to helloKoalaQueueHaHa and then went to the queue module in the Rabbitmq visual interface to create the queue

    Again, there is no connection between the consumer and the producer model. Create this helloKoalaQueueHaHa route name and the producer will consume it and print the message.

  • Instruction 2

    I changed the queue name in the code to helloKoalaQueueHaHa and then went to the queue module in the Rabbitmq visual interface to create the queue

    There is no connection between the consumer and the producer model. Create the producer of the helloKoalaQueueHaHa route name, and the consumer will consume it and print the message.

How do I release message queues

Delete message queues directly from the visual interface

  1. Access http://{rabbitmq installation IP}:15672, login.
  2. Click queues to see all queues you have created,
  3. Select a Queue and a list screen will appear with a Delete button at the bottom. Purge Message will be used to Purge the Queue.

Disadvantages: This can only be done queue by queue, which can be very slow if there are too many messages in the queue.

Message queue release (delete) through code

Message queue switch tutorial

Remember one thing

The producer must specify an Exchange when sending messages. Otherwise, the message will not reach the message Queue directly and exchange will route the message to (or discard) one or more queues.

Then start the chapter on switches

If exchange is not specified (empty), it will default to the AMQP default switch. The AMQP default routing rule matches routes based on whether there is a queue with the same name on THE routingKey and MQ.

Types of Switches

Four commonly used types

  • fanout

  • direct

  • topic

  • headers

Regardless of the type of switch, there is a binding operation, but the route binding policy varies according to the switch type. Different types of things do in the red box below.

Fanout (Chinese translation broadcast)

The Exchange routing rule of type FANout is very simple and routes all messages sent to the Exchange to all queues bound to it without setting routing keys.

In the figure above, all messages sent by the Producter to Exchange (X) are routed to the two queues in the figure and ultimately consumed by the two consumers (Consumer1 and Consumer2).

All messages are routed to two queues. Can both consumers receive all identical messages? The answer is yes, two consumers should normally receive exactly the same queue message. This type is often used for broadcast type requirements, or consumer 1 logs and consumer 2 prints logs

Corresponding code implementation:

Producers:

const amqp = require('amqplib');

async function producer() {
    // Create a link object
    const connection = await amqp.connect('amqp://localhost:5672');

    // Get the channel
    const channel = await connection.createChannel();

    // Declare parameters
    const exchangeName = 'fanout_koala_exchange';
    const routingKey = ' ';
    const msg = 'hello koala';

    / / switches
    await channel.assertExchange(exchangeName, 'fanout', {
        durable: true});// Send a message
    await channel.publish(exchangeName, routingKey, Buffer.from(msg));

    // Close the link
    await channel.close();
    await connection.close();
}
producer();
Copy the code

Consumer:

const amqp = require('amqplib');

async function consumer() {
    // Create a link object
    const connection = await amqp.connect('amqp://localhost:5672');

    // Get the channel
    const channel = await connection.createChannel();

    // Declare parameters
    const exchangeName = 'fanout_koala_exchange';
    const queueName = 'fanout_kaola_queue';
    const routingKey = ' ';

    // Declare a switch
    await channel.assertExchange(exchangeName, 'fanout', { durable: true });

    // Declare a queue
    await channel.assertQueue(queueName);

    // Bind relationships (queue, switch, routing key)
    await channel.bindQueue(queueName, exchangeName, routingKey);

    / / consumption
    await channel.consume(queueName, msg= > {
        console.log('Consumer:, msg.content.toString());
        channel.ack(msg);
    });

    console.log('Consumer started successfully! ');
}
consumer();
Copy the code

Note: Other types of code are already on Github at github.com/koala-codin… Welcome to star.

direct

Direct routes messages to queues whose binding keys exactly match routing keys.

For example, if we send a message to Exchange with routingKey= “error”, the message will be routed to AMQ1 and AMQ2. If we send a message as routingKey= “info” or routingKey= “warning”, the message will only be routed to Queue2. If we send messages with other Routingkeys, the messages will not be routed to either Queue.

topic

Producer-specified RoutingKey messages are forwarded by fuzzy matching based on the queue specified by the consumer, with two wildcard patterns: # : matches one or more keywords * : matches only one keyword

headers

Header Exchange is similar to a topic switch, but unlike a topic switch, where routing is based on routing keys, header exchange routing values are based on the header data of the message. The topic switch routing key is only a string, while the header switch can be an integer and a hash.

Thinking and exploring message queue

Message queues implement RPC

(This short section of content is from the Internet, refer to the article instructions)

RPC remote call server method, using MQ can achieve asynchronous call RPC, based on Direct switch implementation

  1. The client, both producer and consumer, sends RPC call messages to the RPC request queue and listens to the RPC response queue
  2. The server listens to the RPC request queue and executes the server’s methods upon receiving the message
  3. The server sends the result of the method execution to the RPC response queue

(Note that RPC is only mentioned here, because a single RPC is not enough to finish an article. If you are interested, try RPC with queues.)

Is there aMessage persistenceThe necessary?

The message queue is stored in memory. If there is a failure, the message in the message queue will be lost. So for some requirements it is very necessary to persist! RabbitMQ can enable persistence. Persistence parameters can be set in different development languages.

Using Node.js as an example, other languages can search for it themselves

    await channel.assertExchange(exchangeName, 'direct', { durable: true });
    // Note the {durable: true}, which is durable to switches, as well as several other durable methods
Copy the code

Also recommend a good article to write persistence: juejin.cn/post/684490…

Are there any after the consumer finishesMessage responseThe necessary?

Message reply simply means that the consumer notifies the message queue after completing the consumption.

I think it is necessary to turn this on, the consumer completes the task in the message queue, the consumer may fail or hang up, once RabbitMQ sends a message to the consumer and then quickly removes the message from the message queue memory, in which case the consumer’s worker process fails or hangs, The message being processed by the process will also be lost. Also, all unprocessed messages sent to the process will be lost.

To ensure that messages are never lost, RabbitMQ supports message reply. When the message is accepted and processed, a reply is sent back from the consumer to the sender and then deleted by RabbitMQ.

If a consumer hangs up (channel, connection down, or TCP connection lost) and does not send an ACK reply, RabbitMQ will assume that the message has not been fully processed and will re-queue it. This way you can ensure that messages are never lost, even if a worker process accidentally dies.

Message reply is turned off by default. It is time to turn it on with the false (auto-ack configuration item) parameter

Using Node.js as an example, other languages can search for it themselves

// The code when the consumer consumes
await channel.consume(queueName, msg= > {
    console.log('koala:', msg.content.toString());
    / /... Here you can put the business logic processing code, the consumer after the completion of the receipt to send a reply
    channel.ack(msg);// Message reply
}, { noAck: false });
Copy the code

How to achieve fair scheduling?

The prefetch Count item can be set to 1, which will instruct RabbitMQ not to send more than one message to each consumer at a time. In other words, no message is sent to the consumer until the message is processed and answered. Instead, it will send the message to the next idle consumer or worker process.

Here take Node.js as an example. The AMQplib library provides the interface method Prefetch for the stream limiting implementation.

Prefetch parameter description:

  • Count: indicates the number of N messages pushed to the consumer. If the N messages are not ack, the production end will not push them again until the N messages are consumed.
  • Global: specifies the level at which restrictions are imposed. True is for channels and false is for consumers. The default value is false.
// Set limiting parameters when creating consumers
await channel.prefetch(1.false);
Copy the code

How to implement a switch to send messages to multiple consumers in turn, which switch to choose?

If one producer, two consumers, sends a message, and I want the queue to send a message to consumer 1, then consumer 1 sends a message to consumer 2, so that there is a sequential interaction, which switch should I send? Mind you, interaction. Think about it after you read it, okay? And is it necessary to manually call back message queue completion after consumer completion? Is message persistence necessary? What are the benefits of persistence?

(After looking at message queue messaging, you can wonder how messages in pipes (producers) are consumed by consumers, put into queues, and then taken out of queues)

Application scenarios of message queues

  1. Double 11 product second kill/grab tickets function realized

    When we are on Double 11, when we buy a lot of goods in the early morning, and then settle accounts, we will find that the interface will remind us, let us wait, as well as some friendly pictures and text reminders. Rather than like a few years ago, the page is always stuck, error and so on to present to the user.

    Note: Before you read on, if you have done ecommerce, think about how you implemented it and we can discuss it together. This is just to talk about the role of message queue, not the result of final optimization, such as redis control of total cache, etc.)

    Here, when generating orders, there is no need to directly operate database IO and pre-withhold inventory. The inventory is deducted to ensure that it is not oversold, and then the user order is generated asynchronously, which is used once in real timeConsumer queue, so that the speed of response to the user will be much faster; But also to ensure that a lot of sales, the user got the order, do not pay how to do? We all know that orders now have an expiration date, so use another oneThe message queue, is used to judge the order payment timeout. For example, if the user does not pay within five minutes, the order will become invalid. Once the order becomes invalid, new inventory will be added. This is now a lot of online retail enterprises to ensure that many goods sold by the scheme. With small order volumes, order generation is very fast and users hardly need to queue.

  2. Points redemption (points can be used on multiple platforms)

    The points exchange module, which is used by multiple departments in a company, can be implemented through message queue decoupling. Each department system does its own thing, but they can all use this point system to exchange goods and so on. The other modules are completely decoupled from the integral module.

  3. Send emails, user big data analysis and other synchronous mutation step functions

    This feature has more to say, starting from a platform user registration.

    • User registration
    • Users register to select several interest tags, at this time need to calculate the recommended content according to the user’s attributes, user analysis
    • You may need to send an email to the user after registration
    • Send the user a system notification containing instructions
    • , etc.

    Normal registration, no high concurrency:

    For the user, he just wants to register to use the software, as long as the server saves his account information in the database he can log in to do what he wants to do. If the user does not care about these things, the server can put other operations into the corresponding message queue and immediately return the user the result, and the message queue performs these operations asynchronously.

    If a large number of users register and high concurrency occurs:

    If the mail interface is overwhelmed, or the CPU is overloaded with a lot of computation while analyzing information, the user data records will be quickly added to the database, but will be stuck while sending emails or analyzing information, resulting in a large increase in response time, or even a timeout, which is not cost-effective. In this case, these operations are usually put into a message queue (producer consumer model), which processes slowly and completes the registration request quickly without affecting the user’s use of other functions.

  4. Node.js based on RabbitMQ communicates with Phython or other languages

    This also takes advantage of RabbitMQ’s decoupling properties to communicate not only with Phython but with many other languages, too.

conclusion

Dear, don’t just look, you try! Just start the service and install RabbitMQ, it’s fun, even a HelloWorld can try a lot of things. And a lot of what this article says can be done with Redis, you can also go to see my Redis article. By the way, design patterns and data structures are two good things that are increasingly felt.

Refer to the article

www.cnblogs.com/baidawei/p/…

www.sojson.com/blog/48.htm…

www.zhihu.com/question/34…

Bbs.csdn.net/topics/3921…

www.imooc.com/article/293…

Mp.weixin.qq.com/s/wTkwJXlNr…

Node series original articles

Learn more about processes and threads in Node.js

To learn node. js, it is necessary to know stream first

Exports: module. Exports: module

Understand the Events module thoroughly

Node.js advanced fs file module learning

Pay attention to my

  • Welcome to add my wechat (COder_QI), pull you into the technology group, long-term exchange and learning…
  • Welcome to “programmer growth refers to the north”, a heart to help you grow the public number…