Research on delay queue technology
I. Project Background
Delay queue, which is a message queue with delay function, has several application scenarios that need delay processing at present.
Optional technical reference
1. kafka
Consider the premise: because most of the project code and business interaction using Kafka, so I want to integrate a Kafka delay queue, directly provide delay function, more convenient to use.
The rocketMQ delay queue design concept is used to create a number of topics for processing different delay messages, such as task messages delayed one minute, and let topic delay-minut-1 process.
- Delayed messages are sent not directly to the target topic, but to a topic used to process delayed messages, such as delay-mint-1
- Write a code that periodically pulls messages from delay-minutes-1 and sends them to the actual target topic.
Flow chart:
Problem solved: How to make delayed messages wait some time before being sent to the real topic?
A: KafkaConsumer provides a pause and resume API function. When a consumer finds that the consumption time condition is not met, it can suspend the consumer first and move the consumption offset to the previous position.
Disadvantages: High complexity of internal changes in Kafka, which requires additional health checks to make consumers pause, can be reported or restarted if the status is not correct. In addition, the delay time cannot be flexibly set.
2. rocketMQ
Consider the premise: the underlying code has been all encapsulated, directly used, do not care about the underlying code, can achieve decoupling with the business.
General principle:
-
RocketMQ divides the latency of the delay queue into 18 levels. When sending MQ messages, you only need to set delayLevel to put the messages of each delay period into the same queue
-
These queues are polled by a timer to see if the message is due
Flow chart:
Disadvantages:
-
Use middleware, as far as possible need to read the underlying source code, in order to follow-up problems, fast tracking positioning. And finding suitable extension points.
-
The timer uses a single thread to run. If the number of delayed messages is large, the messages may not be sent even after expiration.
3. redis
Consider the premises:
Redisson delay queue, code Redis has been packaged, can be used directly. Redisson. GetBlockingQueue () and Redission getDelayQueue ()
General principle:
zhuanlan.zhihu.com/p/343811173
Three core set structures:
Delay queue: queue in which data is queued
Target blocking queue: Expired data to be consumed
TimeoutSet Expiration time Zset: the score is timeout to determine whether an element is out of date.
To implement the Timer:
The sub/pub function of Redis is applied. When data is put, it is first put into a Zset, and the subscription key is issued at the same time, and the content is published as timeout of data expiration. At this time, the client starts a HashedWheelTimer. Fetch expired data from the Zset page and place it in the blocking queue.
Disadvantages:
-
When sub/pub is used, it may cause multiple clients to start delayed tasks for a period of time at the same time, and repeat the execution. There will also be concurrent security issues, because it involves adding data to the blocking queue and removing the current data from zset.
-
The default is relatively stable when the amount of data is small, and the cluster mode needs to be built when the amount of data is large, which needs to be developed by ourselves
Third, the idea of reconstruction based on Redisson scheme
Nice delay queue
Tech.youzan.com/queuing_del…
Implementation logic diagram
Meanings of each component:
Job: a task that needs to be processed asynchronously. It is the most basic unit. The attributes include user-defined unique jobid, topic task type, delayTime task execution time, ttrTime execution timeout time, and message message content.
Job Pool: A map structure that stores the original job information
Delay Bucket: a group of time-ordered queues (storing only job ids). The data structure of the buckets is the Zset of Redis. The buckets are divided into multiple buckets to improve scanning speed and reduce message Delay
Timer: Scans buckets in real time and adds the jobs whose delay is shorter than or equal to the current time to the Ready Queue.
* In our own implementation, the Ready Queue here replaces a common Kafka topic exit: storing Ready jobs for consumption by client-side consuming applications. The timer is sent directly to Kafka
Compare the Redisson change points
-
The original Redisson delay queue sub/pub is removed to realize the timer idea, and the zset head node is polling to judge whether the expiration time is reached.
-
Add thread pool concept to speed up message processing and reduce delay message time error.
-
In cluster mode, the setnx command of Redis can be used to implement a simple distributed lock to ensure that only one timer thread is executed in the cluster at a time.
Personal change points
- Make universal services and provide unified push topic and unified pull topic
Overall execution process:
-
The business parties send tasks to the entry Topic, generate deferred tasks, and put them into a bucket
-
The timer polls each bucket at any time, and when the time is up, sends a message task to Kafka
-
The consumer can pull tasks from the Kafka common export and do business logic accordingly
-
When the export topic receives a message, Kafka acknowledges and responds once to ensure that the message is not lost
Overall architecture diagram of microservice delay queue
Example:
Kafka common entry delay_entrance_topic
attribute | type | Whether it is required | meaning |
---|---|---|---|
realTopicName | string | is | Business type, actual topic delivered to |
delayTime | long | is | Task delay time |
message | string | is | Specific message content, JSON string |
Kafka export delay_exit_topic
attribute | type | Whether it is required | meaning |
---|---|---|---|
delayJobId | long | is | When kafka is sent, the service does not care about this field until the reply is sent successfully |
realTopicName | string | is | Business type, actual topic delivered to, and filtering for each business |
message | string | is | Specific message content, JSON string |
Four extension point
- Reduce delay time error and use thread pool to speed up the judgment of time expiration
- Cluster mode, in case one of the servers is down and cannot be used, high availability design, using timers to maintain routes
- In Cluseter mode, the timer code logic needs to set a distributed lock to prevent the simultaneous execution of multiple servers
- Message reliability: guarantee to be consumed at least once, unsuccessful consumption, no reply, will be re-delivered.
Five possible problems
Message persistence problem: Based on Redis’ own persistence feature, if Redis data is lost, it means delayed message loss, but it can be guaranteed by master standby and cluster. This allows for subsequent optimizations to persist messages to MangoDB
Other delay queue ideas
Netty time round
The flow chart of HashedWheelTimer
TickDuration: The time of each cell, the time of each rotation
TicksPerWheel: Time wheel array size
HashedWheelBucket: Array, record header,tail
HashedWheelTimeOut: a delayed task carrier placed in a Bucket array with attributes such as front and back Pointers and the number of rounds
If the time wheel is regarded as a map, the tickPerWheel is the size of the map. When the time wheel starts, a startTime is set, that is, each ticket can calculate the delay time, that is, the key of the map, and the value is bucket
The core code, the thread for loop, checks whether the bucket list is up to execution time, executes immediately, and ticket+1 goes down. If not, sleep for a while.
long deadline = tickDuration * (tick + 1);
for (;;) {
// Relative time
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// <=0 indicates that you can dial the clock
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
returncurrentTime; }}// This is for Windows platform compatibility
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
returnLong.MIN_VALUE; }}}}Copy the code
Kafka time round
On the basis of the ordinary time wheel, the idea of space for time is used to store each Bucket with a DelayQueue. There is a PriorityQueue inside the DelayQueue. The size is sorted according to the delay time of each Bucket. If it expires, it can be directly taken out and executed. If it does not, it will be blocked. Cycle through empting the priority queue in turn.
Condition. AwaitNanos () -> parkNanos()
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) /* * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. */ def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket ! = null) { writeLock.lock() try { while (bucket ! = null) {// Drive timingWheel.advancecLock (bucket.getexpiration ()) // Cycle buckek, which is a list of tasks that are added to the time wheel to upgrade or downgrade the time wheel. Flush (reinsert) {flush(reinsert) {flush(reinsert)}} Bucket = delayqueue.poll ()}} finally {writelock.unlock ()} true} else {false}}Copy the code
A small problem
Why is the API provided by the underlying system less efficient when it comes to timing issues?
It is recommended to use the underlying API. The principle is the same. Why is it better than putting it outside? If you know, you can also tell the author in the comments section, thanks!
XXL_JOB
There are two main threads: scheduleThread sweeps the tasks to be executed after 5s from the DB and puts them into the time wheel container.
RingThread is responsible for routing the clockwork to the fast and slow threads for each due task, and RPC calls to the scheduler for execution.
Distributed task scheduling, multiple actuators. The task is persistent, the task is uniformly stored in the library first, and the traditional time wheel is also used for delay.
conclusion
Two very central questions:
Be sure to prioritize all delayed tasks first
2 compare the time problem, to the task execution time to take out
The sorting | Finding the Due Job | |
---|---|---|
RocektMQ | Specify level, similar to bucket sort | The for loop |
HashedWheelTimer | Array, bucket sort | The for loop |
Kafka time round | Heap sort, Priority | Condition. AwaitNanos ()-> parkNanos() |
Redisson delay queue | Zset hop table implementation | First came the Sub /pub subscription, where the client was due to fetch data from Zset using HashedWheelTimer |
Based on the like delay queue | Zset hop table implementation | The for loop iterates, opening multiple threads, one for each bucket |
Therefore, if you want to design a delay queue by yourself, the key is to determine how to solve these two core problems, and adjust the rest according to your own business scenario.
The giant shoulder
Juejin. Cn/post / 684516…
Juejin. Cn/post / 691006…
Juejin. Cn/post / 697641…