Delay queue, as its name implies, is a message queue with delay function. So, under what circumstances do I need this queue?


background

Let’s start with the following business scenario:


  • When the order has not been paid, how to close the order in time and return the inventory?

  • How to regularly check whether the order in refund status has been successfully refunded?

  • If a newly created store does not upload goods within N days, how can the system know this information and send an activation message? , etc.


In order to solve the above problems, the most simple and direct way is to scan the table regularly. Each business maintains its own table sweep logic. As we get more and more business, we’ll see that the logic for the table sweep section is very similar. Consider taking this part of logic out of the concrete business logic and making it a common part.

Is there an open source solution out there? The answer is yes. Beanstalkd (http://kr.github.io/beanstalkd/), it is basically meet the above requirements. However, deleting messages is not particularly convenient and costs more. Moreover, it was developed based on C language. At that time, the main stream of our team was PHP and Java, so we could not do secondary development. So we borrowed its design ideas, using Java to re-implement a delay queue.



Design goals

  • Message transmission reliability: A message is consumed at least once after entering the delay queue.

  • Client support: due to business requirements, at least PHP and Python are supported.

  • High availability: Support for multiple instance deployments at a minimum. After an instance fails, backup instances continue to provide service.

  • Real-time: certain time error is allowed.

  • Message deletion: Service users can delete specified messages at any time.


The overall structure

The entire delay queue consists of four parts:


  • Job Pool Stores meta information about all jobs.

  • Delay Bucket is a group of time-ordered queues that store all jobs that need to be delayed or have been reserved (only Job ids are stored here).

  • The Timer scans each Bucket in real time and adds the jobs whose delay is longer than or equal to the current time to the Ready Queue.

  • The Ready Queue stores jobs in the Ready state (only Job ids are stored here) for the consumer program to consume.


Expressed as follows:


Design points



The basic concept

  • Job: A basic unit of the delay queue, which requires asynchronous processing. Be associated with a specific Topic.

  • Topic: A set (queue) of jobs of the same type. For consumers to subscribe.



Message structure

Each Job must contain the following attributes:


  • Topic: Job type. It can be understood as a specific business name.

  • Id: unique Id of the Job. Used to retrieve and delete the specified Job information.

  • Delay: Indicates the Delay time of the Job. Unit: second. (The server converts this to absolute time)

  • TTR: time-to-run (TTR) specifies the timeout period for Job execution. Unit: second.

  • Body: Indicates the content of a Job, which is stored in JSON format for consumers to perform specific service processing.


The specific structure is shown as follows:

TTR is designed to ensure the reliability of message transmission.


Message state transition

Each Job can be in only one state:


  • Ready: The executable state is waiting for consumption.

  • Delay: indicates that the clock is in the unexecutable state.

  • Reserved: has been read by the consumer, but has not received a response from the consumer (delete, finish).

  • Deleted: Indicates that the consumption is complete or has been deleted.


The following is a schematic diagram of the four states:


Message storage

Before selecting storage media, determine the specific data structure:


  • Job Poll Indicates Job meta information stored in the K/V format. Key is the job ID and value is the job struct.

  • The Delay Bucket is an ordered queue.

  • The Ready Queue is a normal list or Queue.


No one can meet all of these needs better than Redis. The data structure of a bucket is redis zSet, which is divided into multiple buckets to speed up scanning and reduce message latency.



Communication protocol

In order to meet the support of multi-language Client, we choose Http communication mode and use text protocol (JSON) to realize the interaction with the Client. Currently the following protocols are supported:


  • Add: {‘ command ‘:’ add ‘, ‘topic’ : ‘XXX’, ‘id’ : ‘XXX’, ‘delay’ : 30, “TTR event” : 60, ‘body’ : ‘XXX’}

  • Get: {‘ command ‘:’ pop ‘, ‘topic’ : ‘XXX ‘}

  • {‘ command ‘:’ finish ‘, ‘id’ : ‘XXX ‘}

  • {‘ command ‘:’ delete ‘, ‘id’ : ‘XXX ‘}


The body is also a JSON string.


Response structure: {‘ success ‘:true/false,’ error ‘:’ error reason ‘, ‘id’ : ‘XXX’, ‘value’ : ‘job body’} The JOB ID is determined by the service user and must be globally unique. The combination of topic+ business unique ids is recommended here.


This section describes the life cycle of a Job



  • When a user places an order for a product, the system successfully creates the order and puts a job into the delay queue. {‘ topic’:’ orderClose ‘, ‘id’:’ordercloseorderNoXXX’, ‘delay ‘: 1800,’ TTR’:60, ‘body’:’ XXXXXXX ‘}

  • After receiving the job, the delay queue stores the job information to the job pool, calculates the absolute execution time based on the delay, and puts the job ID into a bucket in round-robbin mode.

  • The timer polls each bucket every minute. After 1800 seconds (30 minutes), the timer checks that the execution time of the job is up, and obtains the job ID. The timer obtains meta information from the job pool. If the job is in the deleted state, the system passes and continues polling. If the job is not deleted, check whether the delay in the meta information is greater than or equal to the current time. If yes, add the job ID to the ready queue based on topic and remove the job from the bucket. If not, the delay time is recalculated, the job ID is added to the bucket and removed from the bucket.

  • The consumer polls the ready queue of the corresponding topic (the rationality of the job still needs to be determined), and then performs its own business logic after obtaining the job. At the same time, the server recalculates the execution time of the job obtained by the consumer according to the preset TTR and adds the job to the bucket.

  • After processing services, the consumer sends the finish message to the server, and the server deletes the meta information based on the job ID.



Existing physical topology

Currently, centralized storage is used. In multi-instance deployment, the Timer program may be executed concurrently, causing jobs to be repeatedly placed in the Ready Queue. To solve this problem, we used redis’s setnx command to implement a simple distributed lock so that only one timer thread per bucket can be scanned at a time.


Design deficiencies

The timer is implemented through an infinite loop of independent threads, which will cause a certain amount of CPU waste when there is no ready job.

When the consumer side reserves a job, it uses HTTP short polling and can only obtain one job at a time. If there are many Ready jobs, network I/O consumption increases. The data store uses Redis, and the persistence of messages is limited by redis features. Scale-out relies on third parties (NGINx).


Future Architecture Direction

Implementation of Timer based on Wait/Notify.

Provide TCP long connection API, the implementation of push or long-polling message reserve method. Have your own storage scheme (embedded database, custom data structure to write files) to ensure the persistence of messages.

Implement your own name-server.

Consider providing direct support for periodic tasks.


Source: Youzan Technical team

Original: http://tech.youzan.com/queuing_delay/

If there is any infringement or misconduct, please contact Ruofei (wechat: 1321113940) and delete it immediately, thank you!

, END,





The architect

We are all architects!



From wechat official account: Architect

Generated by “BeePress | wechat public number article collection” WordPress plug-in