Research on delay queue technology

I. Project Background

Delay queue, which is a message queue with delay function, has several application scenarios that need delay processing at present.

Optional technical reference

1. kafka

Consider the premise: because most of the project code and business interaction using Kafka, so I want to integrate a Kafka delay queue, directly provide delay function, more convenient to use.

The rocketMQ delay queue design concept is used to create a number of topics for processing different delay messages, such as task messages delayed one minute, and let topic delay-minut-1 process.

  1. Delayed messages are sent not directly to the target topic, but to a topic used to process delayed messages, such as delay-mint-1
  2. Write a code that periodically pulls messages from delay-minutes-1 and sends them to the actual target topic.

Flow chart:

Problem solved: How to make delayed messages wait some time before being sent to the real topic?

A: KafkaConsumer provides a pause and resume API function. When a consumer finds that the consumption time condition is not met, it can suspend the consumer first and move the consumption offset to the previous position.

Disadvantages: High complexity of internal changes in Kafka, which requires additional health checks to make consumers pause, can be reported or restarted if the status is not correct. In addition, the delay time cannot be flexibly set.

2. rocketMQ

Consider the premise: the underlying code has been all encapsulated, directly used, do not care about the underlying code, can achieve decoupling with the business.

General principle:

  1. RocketMQ divides the latency of the delay queue into 18 levels. When sending MQ messages, you only need to set delayLevel to put the messages of each delay period into the same queue

  2. These queues are polled by a timer to see if the message is due

Flow chart:

Disadvantages:

  1. Use middleware, as far as possible need to read the underlying source code, in order to follow-up problems, fast tracking positioning. And finding suitable extension points.

  2. The timer uses a single thread to run. If the number of delayed messages is large, the messages may not be sent even after expiration.

3. redis

Consider the premises:

Redisson delay queue, code Redis has been packaged, can be used directly. Redisson. GetBlockingQueue () and Redission getDelayQueue ()

General principle:

zhuanlan.zhihu.com/p/343811173

Three core set structures:

Delay queue: queue in which data is queued

Target blocking queue: Expired data to be consumed

TimeoutSet Expiration time Zset: the score is timeout to determine whether an element is out of date.

To implement the Timer:

The sub/pub function of Redis is applied. When data is put, it is first put into a Zset, and the subscription key is issued at the same time, and the content is published as timeout of data expiration. At this time, the client starts a HashedWheelTimer. Fetch expired data from the Zset page and place it in the blocking queue.

Disadvantages:

  1. When sub/pub is used, it may cause multiple clients to start delayed tasks for a period of time at the same time, and repeat the execution. There will also be concurrent security issues, because it involves adding data to the blocking queue and removing the current data from zset.

  2. The default is relatively stable when the amount of data is small, and the cluster mode needs to be built when the amount of data is large, which needs to be developed by ourselves

Third, the idea of reconstruction based on Redisson scheme

Nice delay queue

Tech.youzan.com/queuing_del…

Implementation logic diagram

Meanings of each component:

Job: a task that needs to be processed asynchronously. It is the most basic unit. The attributes include user-defined unique jobid, topic task type, delayTime task execution time, ttrTime execution timeout time, and message message content.

Job Pool: A map structure that stores the original job information

Delay Bucket: a group of time-ordered queues (storing only job ids). The data structure of the buckets is the Zset of Redis. The buckets are divided into multiple buckets to improve scanning speed and reduce message Delay

Timer: Scans buckets in real time and adds the jobs whose delay is shorter than or equal to the current time to the Ready Queue.

* In our own implementation, the Ready Queue here replaces a common Kafka topic exit: storing Ready jobs for consumption by client-side consuming applications. The timer is sent directly to Kafka

Compare the Redisson change points

  1. The original Redisson delay queue sub/pub is removed to realize the timer idea, and the zset head node is polling to judge whether the expiration time is reached.

  2. Add thread pool concept to speed up message processing and reduce delay message time error.

  3. In cluster mode, the setnx command of Redis can be used to implement a simple distributed lock to ensure that only one timer thread is executed in the cluster at a time.

Personal change points

  1. Make universal services and provide unified push topic and unified pull topic

Overall execution process:

  1. The business parties send tasks to the entry Topic, generate deferred tasks, and put them into a bucket

  2. The timer polls each bucket at any time, and when the time is up, sends a message task to Kafka

  3. The consumer can pull tasks from the Kafka common export and do business logic accordingly

  4. When the export topic receives a message, Kafka acknowledges and responds once to ensure that the message is not lost

 

Overall architecture diagram of microservice delay queue

Example:

Kafka common entry delay_entrance_topic

attribute type Whether it is required meaning
realTopicName string is Business type, actual topic delivered to
delayTime long is Task delay time
message string is Specific message content, JSON string

 

Kafka export delay_exit_topic

attribute type Whether it is required meaning
delayJobId long is When kafka is sent, the service does not care about this field until the reply is sent successfully
realTopicName string is Business type, actual topic delivered to, and filtering for each business
message string is Specific message content, JSON string

Four extension point

  1. Reduce delay time error and use thread pool to speed up the judgment of time expiration
  2. Cluster mode, in case one of the servers is down and cannot be used, high availability design, using timers to maintain routes
  3. In Cluseter mode, the timer code logic needs to set a distributed lock to prevent the simultaneous execution of multiple servers
  4. Message reliability: guarantee to be consumed at least once, unsuccessful consumption, no reply, will be re-delivered.

Five possible problems

Message persistence problem: Based on Redis’ own persistence feature, if Redis data is lost, it means delayed message loss, but it can be guaranteed by master standby and cluster. This allows for subsequent optimizations to persist messages to MangoDB

Other delay queue ideas

Netty time round

The flow chart of HashedWheelTimer

 

TickDuration: The time of each cell, the time of each rotation

TicksPerWheel: Time wheel array size

HashedWheelBucket: Array, record header,tail

HashedWheelTimeOut: a delayed task carrier placed in a Bucket array with attributes such as front and back Pointers and the number of rounds

If the time wheel is regarded as a map, the tickPerWheel is the size of the map. When the time wheel starts, a startTime is set, that is, each ticket can calculate the delay time, that is, the key of the map, and the value is bucket

The core code, the thread for loop, checks whether the bucket list is up to execution time, executes immediately, and ticket+1 goes down. If not, sleep for a while.

      
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                // Relative time
                final long currentTime = System.nanoTime() - startTime;
               
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

                // <=0 indicates that you can dial the clock
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        returncurrentTime; }}// This is for Windows platform compatibility
                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }

                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        returnLong.MIN_VALUE; }}}}Copy the code

 

Kafka time round

On the basis of the ordinary time wheel, the idea of space for time is used to store each Bucket with a DelayQueue. There is a PriorityQueue inside the DelayQueue. The size is sorted according to the delay time of each Bucket. If it expires, it can be directly taken out and executed. If it does not, it will be blocked. Cycle through empting the priority queue in turn.

Condition. AwaitNanos () -> parkNanos()

private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) /* * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. */ def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket ! = null) { writeLock.lock() try { while (bucket ! = null) {// Drive timingWheel.advancecLock (bucket.getexpiration ()) // Cycle buckek, which is a list of tasks that are added to the time wheel to upgrade or downgrade the time wheel. Flush (reinsert) {flush(reinsert) {flush(reinsert)}} Bucket = delayqueue.poll ()}} finally {writelock.unlock ()} true} else {false}}Copy the code

 

 

A small problem

Why is the API provided by the underlying system less efficient when it comes to timing issues?

It is recommended to use the underlying API. The principle is the same. Why is it better than putting it outside? If you know, you can also tell the author in the comments section, thanks!

 

XXL_JOB

There are two main threads: scheduleThread sweeps the tasks to be executed after 5s from the DB and puts them into the time wheel container.

RingThread is responsible for routing the clockwork to the fast and slow threads for each due task, and RPC calls to the scheduler for execution.

Distributed task scheduling, multiple actuators. The task is persistent, the task is uniformly stored in the library first, and the traditional time wheel is also used for delay.

 

conclusion

Two very central questions:

Be sure to prioritize all delayed tasks first

2 compare the time problem, to the task execution time to take out

  The sorting Finding the Due Job
RocektMQ Specify level, similar to bucket sort The for loop
HashedWheelTimer Array, bucket sort The for loop
Kafka time round Heap sort, Priority Condition. AwaitNanos ()-> parkNanos()
Redisson delay queue Zset hop table implementation First came the Sub /pub subscription, where the client was due to fetch data from Zset using HashedWheelTimer
Based on the like delay queue Zset hop table implementation The for loop iterates, opening multiple threads, one for each bucket

 

Therefore, if you want to design a delay queue by yourself, the key is to determine how to solve these two core problems, and adjust the rest according to your own business scenario.

The giant shoulder

Juejin. Cn/post / 684516…

Juejin. Cn/post / 691006…

Juejin. Cn/post / 697641…