Abstract: Using RabbitMQ message queues can effectively increase the peak processing capacity of the system.

Introduction of the RabbitMQ

RabbitMQ is a Message Broker that supports a variety of asynchronous Message processing methods, the most common of which are:

  • Work Queue: Messages are cached in a Queue. By default, multiple workers process messages in a Round Robin manner. Each message will only be assigned to a single worker.
  • Publish/Subscribe: Each consumer that subscribes to a message receives a message, so each message is typically assigned to multiple workers, each of whom processes the message differently.

RabbitMQ also supports Routing, Topics, and Remote Procedure calls (RPC).

RabbitMQ is the intermediate node between the producer and consumer of messages, responsible for caching and distributing messages. RabbitMQ receives messages from producers, caches them in memory, and distributes them to consumers in different ways. RabbitMQ can also write messages to disk for persistence, so that if RabbitMQ crashes unexpectedly, the message data is not completely lost.

Why RabbitMQ?

At its simplest, it supports different message processing methods such as Work Queue, which can be used in different business scenarios. For Fundebug we have only used RabbitMQ’s Work Queue, the message Queue.

Using message queues, less urgent but resource-intensive computing tasks can be queued into RabbitMQ as messages, which can then be processed by multiple processing modules.

The biggest benefit of this is that it increases the system’s peak processing capability. Messages that are too late to process are cached in RabbitMQ to prevent the system from crashing due to too many simultaneous computations. Those messages that can’t be processed will be processed slowly after the peak has passed.

Another benefit is decoupling. The producer of the message only needs to send the message to RabbitMQ and it does not affect the producer’s response performance when the message is processed.

Advertising: Free trial is welcomeFundebug, for you to monitor online code bugs, improve user experience ~

Install and run RabbitMQ

Running RabbitMQ with Docker is as simple as executing a single command:

sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7
Copy the code

For those who are not familiar with Docker, LET me explain the command options of Docker:

  • -d: indicates the background running container
  • –name rabbitMQ: Set the container name to rabbitMQ
  • -h rabbitmq: Set the host name of the rabbitMQ container to RabbitMQ. If you want rabbitMQ message data to be stored persistently on local disks, you need to set the host name because rabbitMQ stores data in the host name
  • -p 5672:5672: maps port 5672 of a container to port 5672 of the local host so that rabbitMQ can be accessed through port 5672 of the local host
  • – v/var/lib/rabbitmq: / var/lib/rabbitmq: Map the /var/lib/rabbitmq directory of the container to the /var/lib/rabbitmq directory of the local host. This will persist rabbitMQ message data to the local disk, even if the RabbitMQ container is deleted.

Docker is the official image provides a service faster, so the commands in the Rabbit named registry.docker-cn.com/library/rabbitmq:3.7 Docker image.

If you don’t know Docker, I suggest you learn it. If you don’t want to learn, the command to install RabbitMQ in Ubuntu 14.04 looks like this:

sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
Copy the code

Start the RabbitMQ:

sudo service rabbitmq-server start
Copy the code

Message queue code example

Next, we use Node.js to implement a simple message queue.

Producer of message: sender.js

const amqp = require("amqplib");

const queue = "demo";

async function sendMessage(message)
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.sendToQueue(queue, new Buffer(message),
    {
        // When RabbitMQ is turned off, messages are saved to disk
        persistent: true
    });
}


setInterval(function()
{
    sendMessage("Hello, Fundebug!");
}, 1000)
Copy the code
  • In the sender, keep sending “Hello, Fundebug!” to the message queue. .

Consumers of messages: receiver.js

const amqp = require("amqplib");

const queue = "demo";

async function receiveMessage()
{
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();
    await channel.assertQueue(queue);
    await channel.consume(queue, function(message)
    {
        console.log(message.content.toString());
        channel.ack(message);
    });
}

receiveMessage();
Copy the code
  • In receiver, a message is read from the message queue and printed.

We used the amqplib module to communicate with RabbitMQ, see the documentation for details of the interface.

Set persistent to true when calling sendToQueue so that messages will be saved to disk when RabbitMQ is turned off. It’s easy to test this:

  • Close the receiver
  • Start the sender and send the message to RabbitMQ
  • Sudo docker restart RabbitMQ
  • Start the receiver and you will find that it can receive messages sent by the Sender before RabbitMQ restarts

Since the RabbitMQ container stores the directory where the data is stored (/var/lib/rabbitmq) as a data volume on the localhost, it will have the same effect even if the RabbitMQ container is deleted (sudo docker rm -f RabbitMQ) and run again.

In addition, this code adopts the latest node. js asynchronous code writing way: Async/Await, so very concise, interested students can understand.

The Demo works very simply:

  • Run RabbitMQ containers
sudo ./start_rabbitmq.sh
Copy the code
  • Send a message
node ./sender.js
Copy the code
  • Receives the message
node ./receiver.js
Copy the code

On the receiver, you can see “Hello, Fundebug!” printed repeatedly. .

The repository address is Fundebug/rabbitmq-demo

Examples of automatic reconnection code

In production environments, RabbitMQ restarts inevitably, such as disk replacement or server crash due to high load. Since RabbitMQ can write messages to disk, data is “safe”. However, automatic reconnection must be implemented in the code, otherwise the Node.js application will crash when RabbitMQ stops. Here is an example of automatic reconnection code for your reference:

Message producer: sender_reconnect. Js

const amqp = require("amqplib");

const queue = "demo";

var connection;

/ / connect the RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"),
        {
            // When RabbitMQ restarts, messages are saved to disk
            persistent: true
        });

        connection.on("error".function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close".function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();
Copy the code

Message consumer: receiver_reconnect. Js

const amqp = require("amqplib");

const queue = "demo";

var connection;

/ / connect the RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.consume(queue, async function(message)
        {
            console.log(message.content.toString());
            channel.ack(message);
        });

        connection.on("error".function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close".function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();
Copy the code

This way, even if RabbitMQ restarts, the sender and receiver can automatically reconnect to RabbitMQ. If you want to monitor RabbitMQ for errors, you can use Fundebug’s Node.js error monitoring service to send an alarm when an “error” or” close” event is emitted, so that developers can locate and handle the BUG.

reference

  • AMQP library (RabbitMQ) – async/await
  • RbbitMQ documentation: Work Queue(JavaScript)
  • Won’t persist data
  • How to build reconnect logic for amqplib

About Fundebug

Fundebug focuses on real-time BUG monitoring for JavaScript, wechat applets, wechat games, Alipay applets, React Native, Node.js and Java online applications. Since its launch on November 11, 2016, Fundebug has handled more than 1 billion error events in total, and paid customers include Google, 360, Kingsoft, Minming.com and many other brands. Welcome to try it for free!

Copyright statement

Reprint please indicate the author Fundebug and this article addresses: blog.fundebug.com/2018/04/20/…