About the Apache Pulsar

Apache Pulsar is the top project of Apache Software Foundation. It is the next generation cloud native distributed message flow platform, integrating message, storage and lightweight functional computing. It adopts the architecture design of computing and storage separation, supports multi-tenant, persistent storage, and multi-room cross-region data replication. It has strong consistency, high throughput, low latency, and high scalability.

GitHub address: github.com/apache/puls…

Apache Pulsar is a multi-tenant, high-performance inter-service message transfer solution that supports multi-tenant, low latency, read/write separation, cross-region replication, rapid expansion, and flexible fault tolerance. MQ team of Tencent Data Platform Department has conducted in-depth research on Pulsar and optimized a lot of performance and stability. Pulsar has been launched on Tencent cloud message queue TDMQ.

This article mainly introduces the implementation of Pulsar delayed message delivery, and hopes to communicate with you.

What is delayed message delivery

Delayed message delivery is very common in MQ application scenarios. It means that the message is not delivered immediately after it is sent to the MQ server, but is delivered to the consumer after a fixed period of time according to the attributes in the message. There are two types of messages: timed message and delayed message:

  • Timed messages: The Producer sends a message to the MQ server, but does not expect the message to be delivered immediately. Instead, the message is deferred to a Consumer for consumption at a time after the current point in time.
  • Delayed messages: The Producer sends a message to the MQ server, but instead of expecting the message to be delivered immediately, it is delayed for a certain amount of time before being delivered to the Consumer for consumption.

In the industry, Tencent Cloud’S CMQ and Alibaba Cloud’s RocketMQ also support delayed message delivery:

  • CMQ: The delay period is defined as “flight status”. You can set DelaySeconds to set the delay range. The value ranges from 0 to 3600 seconds, that is, the maximum duration for which messages are invisible is 1 hour.
  • RocketMQ: The open source version delay messages are temporarily stored in an internal topic and support specific levels, such as timing 5s, 10s, 1m, etc. The commercial version supports arbitrary time precision.

The open source NSQ, RabbitMQ, ActiveMQ, and Pulsar also have built-in delayed message processing capabilities. While each MQ project is used and implemented differently, the core implementation ideas are the same: The Producer sends a Delayed message to a Topic, the Broker puts the Delayed message into temporary storage for temporary storage, and the Delayed Tracker Service checks whether the message is due and delivers the Delayed message.

Usage scenarios for delayed message delivery

Delayed message delivery is to delay the processing of the current message and trigger the delivery at a certain point in the future. There are many practical application scenarios, such as exception detection retry, order timeout cancellation, appointment reminder, etc.

  • If a service request is abnormal, put the abnormal request into a separate queue and retry it five minutes later.
  • Users purchase goods, but has not been in the state of payment, need to remind the user to pay regularly, timeout will close the order;
  • Interview or meeting appointment, half an hour before the interview or meeting, send a notice to remind again;

In a recent Case where the service uses Pulsar delayed messages, the service needs to associate the log messages of two systems. One system may time out or fail to query Hbase. In this Case, the failed association task needs to be scheduled again when the cluster is idle.

How do I use Pulsar to delay message delivery

Pulsar first introduced the feature of delayed message delivery in 2.4.0. By using delayed message in Pulsar, the time of delayed delivery can be accurately specified. There are two ways of deliverAfter and deliverAt. DeliverAt can specify a specific timestamp; DeliverAfter can specify how long after the current time. The essence of both approaches is the same: the Client calculates the timestamp and sends it to the Broker.

1. DeliverAfter sent

producer.newMessage()
	.deliverAfter(long time, TimeUnit unit)
	.send();
Copy the code

2. DeliverAt sent

producer.newMessage()
	.deliverAt(long timestamp)
	.send();
Copy the code

In Pulsar, you can support delayed messages with large spans, such as one month or six months. Both delayed and non-delayed messages are supported within a Topic. The following figure shows the process of delayed messages in Pulsar:

The M1, M3, M4, and M5 sent by producer have different delay times. M2 is a normal message that does not need delay delivery, and consumers will ack according to different delay times when consuming.

Implementation principle of Pulsar delayed message delivery

As you can see from the above usage, Pulsar supports delayed message delivery with second-level accuracy, unlike RocketMQ which supports fixed-time level latency.

The Delayed Message delivery method of Pulsar is relatively simple, and all Delayed messages will be recorded by the corresponding index of Delayed Message Tracker. Index is a timestamp | LedgerID | EntryID of three parts, including LedgerID | EntryID used to locate the message, the timestamp in addition to record the time of delivery, It is also used for delayed index priority queue ordering.

Delayed Message Tracker maintains a Delayed index priority queue in out-of-heap memory, and the heap is sorted according to the delay time, with the one with the shortest delay time placed on top and the longer the delay, the more Delayed it will be. In the process of consumption, the consumer will first go to the Delayed Message Tracker to check whether there is any Delayed Message to be delivered. If there is any Delayed Message, the consumer will retrieve the corresponding index from the Tracker and find the corresponding Message for consumption. If there are no expired messages, the normal messages are consumed directly.

If Broker breakdown or topic ownership transfer occurs in the cluster, Pulsar will rebuild the delayed index queue to ensure that the delayed message can work properly.

Challenges of Pulsar delayed message delivery

It can be seen from the implementation principle of Pulsar delayed message delivery that this method is simple and efficient, less intrusive to Pulsar kernel, and can support delayed message to any time. However, it was found that Pulsar’s implementation scheme could not support large-scale use of delayed messages, mainly for the following two reasons:

1. Delayed index queue is memory limited

The delayed index of a delayed message consists of three longs, and the memory overhead is not significant for small delayed messages. However, since index queues are subscription levels, as many Index queues are maintained for the same partition in a topic as there are subscriptions; At the same time, the more messages are delayed and the longer they are delayed, the more memory the INDEX queue takes up.

2. Delayed Index Cost of queue reconstruction

As mentioned above, Pulsar will rebuild delayed index queue if Broker breakdown or topic ownership transfer occurs in the cluster. For large delay messages with long spans, the reconstruction time may be in the hour level. In order to reduce the reconstruction time of delayed index queue, although more partitions can be assigned to topic to improve the concurrency of reconstruction, the problem of reconstruction time cost cannot be completely solved.

Pulsar delayed message delivery future work

Pulsar’s current delayed message delivery scheme is simple and efficient, but there are still risks when handling large delayed messages. In terms of delayed message delivery, the Pulsar community and the MQ team at Tencent’s Data Platform Department will next focus on supporting massive delayed messages. The scheme currently under discussion is to add a time partition to the delayed index queue, and the Broker will only load the current and recent time slice delayed index into memory, and the rest of the time slice will be partitioned into persistent disk. The example figure is shown below:

In the figure above, we partitioned the Delayed index queue at 5-minute intervals, with M5 and M1 placed in time partition 1, which was placed in memory due to the latest delay time. M4 and M3 are in time partition 2, the delay time is later, and index is stored in the disk. This scheme can not only reduce the time cost of delayed index queue reconstruction, but also reduce the dependence on memory.

conclusion

This article introduces the concepts and application scenarios of delayed message delivery, and extends the implementation principle of Apache Pulsar in detail. Pulsar’s current scheme is simple and efficient, and supports delayed message delivery with second-level accuracy. However, it has some limitations when dealing with large-scale delayed messages. The Pulsar community and the MQ team at Tencent’s Data Platform department will also focus on supporting mass latency messaging next.

The author of this article, zhang Chao, originally posted on the public account “Tencent Cloud Middleware”, has been reprinted with authorization, with minor adjustments.

Welcome to ApachePulsar Chinese community public account ApachePulsar, get the latest ApachePulsar dynamic and technical dry goods.