Writing in the front
In practical work, many partners will adopt the way of periodic scanning of data tables when developing scheduled tasks. However, there are significant drawbacks to this approach: if there is a large amount of data, frequent scanning of the table can put a lot of strain on the database; It is difficult to support large-scale distributed scheduled tasks. Difficult to support precise timing tasks; Waste a lot of CPU resources; The scanned data is mostly tasks that do not need to be performed. So, with all the drawbacks of periodic scanning of tables, is there a way to solve these problems? Today, Glacier is here with his open source project myKit-Delay!! Open source: github.com/sunshinelyz…
If you have any problems in using the framework, you can add glacier wechat [SUN_shine_LYz] for communication.
The article is available at github.com/sunshinelyz…
Project description
Mykit is a simple, stable and extensible delay message queue framework that provides precise timing tasks and delay queue processing functions.
Project Module Description
- Mykit-delay-common: mykit-delay delay message queue framework universal tool module, providing universal tool classes
- Mykit-delay-config: mykit-delay general configuration module of the delay message queue framework, providing global configuration
- Mykit-delay-queue: mykit-delay delay message queue framework core implementation module, currently all major functions are implemented in this module
- Mykit-delay-controller: mykit-delay Restful interface implementation module of the delay message queue framework, which provides Restful interface access and is compatible with calls in various languages
- Mykit-delay-core: mykit-delay entry to delay message queue framework, the whole framework startup program is implemented in this module
- Mykit-delay-test: Mykit-delay delay message queue framework universal test module, mainly provides Junit unit test cases
Demand background
- If the user does not pay after placing the order, the payment will expire after 30 minutes
- Notifies users of system activities at a certain point in time
- If the service fails, try again 10 minutes later
The most simple way to deal with similar scenarios is to use scheduled tasks. If there is a large amount of data, some data may be seriously delayed, and more and more scheduled services lead to cumbersome task scheduling and difficult management.
The queue design
The overall architecture design is shown in the figure below.
Issues to consider before development
- Timeliness The consumer can receive it on time
- Consumption weight of messages at the same time
- Reliability messages cannot go unconsumed
- Recoverable At least the data can be recovered if other circumstances cause the messaging system to become unavailable
- Retractable support for messages that have not reached execution time because they are delayed can be unconsumed
- The HA/ master/slave mode does not have multiple instances working together
- How does the consumer end consume
Of course, the primary reason for selecting Redis as the data cache is that Redis itself supports zSET data structure (score delay time is milliseconds), which reduces the trouble of sorting and has high performance. Just as our requirement is to determine the order of execution according to the time dimension and also supports map List data structure.
Simply define a message data structure
private String topic;/***topic**/
private String id;/*** automatically generates globally unique snowflake**/
private String bizKey;
private long delay;/*** delay milliseconds **/
private int priority;/ / priority
private long ttl;/** TTL **/
private String body;/*** message body **/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();
Copy the code
Operating principle:
- Use maps to store metadata. Id as key, the entire message structure is serialized (JSON /…) It is then put into the meta-message pool as a value.
- Put the ids into an ordered list of N zsets with createTime+delay+priority as score. Change the status to Being Delayed
- Use timer to monitor top 10 data in zSET ordered list in real time. If the data score<= current time millisecond is retrieved, put it into a new consumable list according to topic, delete the retrieved data in zset, and change the state to be consumable
- The client only needs to fetch data from the consumable queue. And the status must be unconsumed running time <= current time. If not, put it into the ZSET list again and change the status to delay. If yes, the modified state is consumed. Or simply delete the metadata.
The client
Because of the different programming languages involved, HTTP access is currently supported by default.
- Add delay message add success return consume unique ID POST /push {… . The message body}
- Is the message ID GET /delete required to delete a delayed message? id=
- ReStore delayed messages GET /reStore? Expire = true | false expire if recovery is not overdue execution.
- To reStore a single delayed message, the message ID GET /reStore/ ID needs to be passed
- Getting messages requires a long connection to GET/GET /topic
Exposing the service with Nginx, configure polling to evenly distribute traffic when adding delayed messages.
Instead of using HTTP long connections to consume messages, the current system uses MQ to consume data so that clients don’t have to worry about delaying message queues. Just intercept it when sending MQ and if it is a delayed message it will be processed by the delayed message system.
Message recoverable
In normal cases, logs are generally recorded, such as the mysql binlog.
Here we directly use the mysql database for logging.
Now create the following two tables:
- The message table field contains the entire message body
- Message flow table fields include message ID, change status, change time, ZSET scanning thread Name, host/ IP
The zset scanning thread Name is defined to make it clear to which Zset messages are being distributed. If the key of the Zset has something to do with the name of the thread that’s monitoring the Zset or zset key in this case.
Support message Recovery
If the Redis server goes down, the data is gone after the server restarts. Therefore, this recovery is necessary by redistributing all data from table 1 (message table) whose message state is not equal to consumed data to the delay queue and synchronizing the state.
You can do the same for individual tasks.
Table design here, I will directly give the SQL statement to create a table.
DROP TABLE IF EXISTS `mykit_delay_queue_job`;
CREATE TABLE `mykit_delay_queue_job` (
`id` varchar(128) NOT NULL,
`bizkey` varchar(128) DEFAULT NULL,
`topic` varchar(128) DEFAULT NULL,
`subtopic` varchar(250) DEFAULT NULL,
`delay` bigint(20) DEFAULT NULL,
`create_time` bigint(20) DEFAULT NULL,
`body` text,
`status` int(11) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
`update_time` datetime(3) DEFAULT NULL.PRIMARY KEY (`id`),
KEY `mykit_delay_queue_job_ID_STATUS` (`id`,`status`),
KEY `mykit_delay_queue_job_STATUS` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for mykit_delay_queue_job_log
-- ----------------------------
DROP TABLE IF EXISTS `mykit_delay_queue_job_log`;
CREATE TABLE `mykit_delay_queue_job_log` (
`id` varchar(128) NOT NULL,
`status` int(11) DEFAULT NULL,
`thread` varchar(60) DEFAULT NULL,
`update_time` datetime(3) DEFAULT NULL,
`host` varchar(128) DEFAULT NULL,
KEY `mykit_delay_queue_job_LOG_ID_STATUS` (`id`,`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
About High availability
Zookeeper is still used for distributed coordination.
If there are multiple instances, only one instance can work at most at the same time, thus avoiding the disadvantages brought by distributed competitive locks. Of course, if the business needs multiple instances to work at the same time, it is also supported, that is, a message can only be processed by one instance at most. Zookeeper or Redis can be used to achieve distributed locks.
Finally, I did a test to run multiple instances at the same time, but the performance may be reduced because of the problem related to locks, but the single performance is very good. Therefore, docker-based active/standby deployment is recommended.
The operation mode
- Master (HA) needs to be configured
mykit.delay.registry.serverList
Zk Cluster address list - Cluster support involves distributed lock contention and the effect of distributed lock adoption is not obvious
redis
thesetNx
implementation - StandAlone
At present, the master Slave mode is recommended after testing, and the Cluster mode will be optimized later
How to access
In order to provide a unified framework for precise timed tasks and delay queues, MyKit-Delay provides HTTP Rest interface for other business systems to call. The interface is easy to use. You only need to simply call the interface and pass the corresponding parameters.
The message body
Parameters in JSON data format are currently only available over HTTP
- Body The body of the business message
- Delay Indicates the delay of milliseconds
createTime
The number of milliseconds between - Id Task ID The system automatically generates a task that is successfully created
- Status Status This parameter is left blank by default
- Topic title
- Subtopic retains the field
- TTL Reserved field
- CreateTime Time when a task is created. This parameter is mandatory
Add tasks
/push
POST application/json
{"body":"{hello world}"."delay":10000."id":"20"."status":0."topic":"ces"."subtopic":"",ttl12} ":Copy the code
Delete the task
Deleting a task requires logging a JobId
/delete? jobId=xxx GETCopy the code
Restoring a Single task
This function is used to recover tasks based on logs
/reStoreJob? JobId=xxx GETCopy the code
Restore all outstanding tasks
Restore tasks based on logs
/reStore? expire=true
GET
Copy the code
The expire parameter indicates whether data that has expired and has not been executed needs to be restored
Clear queue data
Clear all data in the queue based on unfinished data in the log. This deletes all tasks from the cache
/clearAll
GET
Copy the code
The client obtains the queue mode
Currently, RocketMQ and ActiveMQ push modes are implemented by default. It relies on MQ to realize the coupling between the delay framework and the specific business system.
The message body andRocketMQ
andActiveMQ
Message field mappings
mykit-delay | RocketMQ | ActiveMQ | note | |
---|---|---|---|---|
topic | topic | topic | Peer-to-peer send queue name or topic name | |
subtopic | subtopic | subtopic | Point-to-point send queue subname or subject subname | |
body | The message content | The message content | The message content |
About System Configuration
Delay framework interact with the specific business system by delaying framework configuration, specific configuration file location for mykit – delay – config item under resources/properties/starter. The properties file.
test
You need to configure the database address and Redis address. If the server is not in single-machine mode, you need to configure Zookeeper
Run mykit – delay – a test class under test module IO. Mykit. Delay. Test. PushTest add tasks to the queue
Start mykit – delay – test module of IO. Mykit. Delay. TestDelayQueue add data before consumption In order to facilitate the query effect is the default mode of consumption consoleCQ console output
extension
Supports configurable number of ZSET queues to avoid high latency caused by big data.
The recent planning
- Partitions (Bucks) support dynamic Settings
- Redis and database data consistency issues (
important
) - Implement your own push and pull mechanism
- The support for switchable implementation only relies on Redis implementation at present, which will be optimized later
- Support for Web console to manage queues
- Implementing message consumption
TTL
mechanism
If this open source framework helps you, please check out the Github link: github.com/sunshinelyz… , give a Star, let more partners see, reduce the tedious work of scanning data table scheduled task development. I also hope that more and more partners can participate in this open source project, and we will fatten it together!!
Ok, it’s late, let’s call it a day, I’m Glacier, we’ll see you next time!!
Big welfare
WeChat search the ice technology WeChat 】 the public, focus on the depth of programmers, daily reading of hard dry nuclear technology, the public, reply within [PDF] have I prepared a line companies interview data and my original super hardcore PDF technology document, and I prepared for you more than your resume template (update), I hope everyone can find the right job, Learning is a way of unhappy, sometimes laugh, come on. If you’ve worked your way into the company of your choice, don’t slack off. Career growth is like learning new technology. If lucky, we meet again in the river’s lake!
In addition, I open source each PDF, I will continue to update and maintain, thank you for your long-term support to glacier!!
Write in the last
If you think glacier wrote good, please search and pay attention to “glacier Technology” wechat public number, learn with glacier high concurrency, distributed, micro services, big data, Internet and cloud native technology, “glacier technology” wechat public number updated a large number of technical topics, each technical article is full of dry goods! Many readers have read the articles on the wechat public account of “Glacier Technology” and succeeded in job-hopping to big factories. There are also many readers to achieve a technological leap, become the company’s technical backbone! If you also want to like them to improve their ability to achieve a leap in technical ability, into the big factory, promotion and salary, then pay attention to the “Glacier Technology” wechat public account, update the super core technology every day dry goods, so that you no longer confused about how to improve technical ability!