Doing timed tasks in development is a very common business scenario. At the code level, Node.js can be partially achieved by using basic syntax such as setTimeout and setInerval or using libraries such as Node-Schedule. You can use Redis’ Keyspace Notification or Linux’s own Crontab to perform scheduled tasks on third-party services. RabbitMQ as a messaging middleware can also be used for scheduled tasks using its dead letter queue.
This article uses Node.js as the demo language and amqplib for RabbitMQ.
Dead-letter queue
RabbitMQ has a Dead Letter Exchange called DLX, which stands for dead-letter-exchange. When a message becomes dead message in one queue, it is re-sent to another exchange, the DLX. Queues bound to the DLX are called dead letter queues. Messages become dead letter in one of the following situations:
- The message is rejected and the Requeue parameter is set to false
- Message expiration
- The queue length reaches the maximum. Procedure
The DLX is also a normal switch, no different from a normal switch, and can be specified on any queue, essentially setting the attributes of a queue. When there is a dead letter in this queue, RabbitMQ will automatically re-publish the message to the set DLX and then route it to another queue, the dead letter queue. To add a DLX to a queue, you need to set the deadLetterExchange and deadLetterRoutingKey parameters when creating the queue. The deadLetterRoutingKey parameter is optional and represents the routing key specified for the DLX. If no specific routing key is specified, the routing key of the original queue is used.
const amqp = require('amqplib');
const myNormalEx = 'my_normal_exchange';
const myNormalQueue = 'my_normal_queue';
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let connection, channel;
amqp.connect('amqp://localhost')
.then((conn) = > {
connection = conn;
return conn.createChannel();
})
.then((ch) = > {
channel = ch;
ch.assertExchange(myNormalEx, 'direct', { durable: false });
return ch.assertQueue(myNormalQueue, {
exclusive: false.deadLetterExchange: myDeadLetterEx,
deadLetterRoutingKey: myDeadLetterRoutingKey,
});
})
.then((ok) = > {
channel.bindQueue(ok.queue, myNormalEx);
channel.sendToQueue(ok.queue, Buffer.from('hello'));
setTimeout(function () { connection.close(); process.exit(0)},500);
})
.catch(console.error);
Copy the code
The above code declares an exchange, myNormalEx, and then declares a queue, myNormalQueue, adding a DLX to the queue by setting its deadLetterExchange parameter when declaring it. So when a message in the myNormalQueue becomes dead letter it will be published to myDeadLetterEx.
Expiration Time (TTL)
In RabbbitMQ, expiration times can be set for messages and queues. When the expiration time is set through the queue property, all messages in the queue have the same expiration time. When separate expiration times are set for messages, the TTL for each message can be different. If both methods are used together, the TTL of the message takes the smaller value in between. Once the TTL of a Message in the queue exceeds the set value, it becomes a Dead Message and consumers cannot receive the Message.
Setting the TTL for each message is to set the expiration parameter, in milliseconds, when sending messages.
const amqp = require('amqplib');
const myNormalEx = 'my_normal_exchange';
const myNormalQueue = 'my_normal_queue';
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let connection, channel;
amqp.connect('amqp://localhost')
.then((conn) = > {
connection = conn;
return conn.createChannel();
})
.then((ch) = > {
channel = ch;
ch.assertExchange(myNormalEx, 'direct', { durable: false });
return ch.assertQueue(myNormalQueue, {
exclusive: false.deadLetterExchange: myDeadLetterEx,
deadLetterRoutingKey: myDeadLetterRoutingKey,
});
})
.then((ok) = > {
channel.bindQueue(ok.queue, myNormalEx);
channel.sendToQueue(ok.queue, Buffer.from('hello'), { expiration: '4000'});
setTimeout(function () { connection.close(); process.exit(0)},500);
})
.catch(console.error);
Copy the code
The code above passes {expiration: ‘4000’} will set expiration time for 4 seconds of this message, the message set 4 seconds expired, this message does not necessarily will be discarded in 4 seconds or enter a dead letter, only when the news reached the first team is going to be spending will determine whether the overdue, if not overdue will be consumer spending, if already overdue will be deleted or become a dead letter.
Timing task
Since messages in a queue become dead letters when they expire, and the dead letters are published to the DLX of the queue where the message resides, a task can be processed periodically by setting an expiration time for the message and then consuming the queue bound to the DLX of the queue where the message resides. When there is a queue queue1, its DLX is deadEx1. DeadEx1 is bound to a queue deadQueue1. When a message in queue Queue1 becomes dead letter due to expiration, it will be published to deadEx1. By consuming messages in queue deadQueue1, you are consuming dead-letter messages in Queue1 that have expired.
The code for the dead-letter queue is as follows:
const amqp = require('amqplib');
const myDeadLetterEx = 'my_dead_letter_exchange';
const myDeadLetterQueue = 'my_dead_letter_queue';
const myDeadLetterRoutingKey = 'my_dead_letter_routing_key';
let channel;
amqp.connect('amqp://localhost')
.then((conn) = > {
return conn.createChannel();
})
.then((ch) = > {
channel = ch;
ch.assertExchange(myDeadLetterEx, 'direct', { durable: false });
return ch.assertQueue(myDeadLetterQueue, { exclusive: false });
})
.then((ok) = > {
channel.bindQueue(ok.queue, myDeadLetterEx, myDeadLetterRoutingKey);
channel.consume(ok.queue, (msg) => {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true})
})
.catch(console.error);
Copy the code
If myDeadLetterEx is a direct queue, you must specify BindingKey (myDeadLetterRoutingKey). If you do not specify BindingKey, you must specify BindingKey (myDeadLetterRoutingKey). Declare myDeadLetterEx as fanout.