Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.
Demand background
-
After the user places the order successfully, every 20 minutes to send the user door-to-door service notification SMS
-
One hour after the order is completed, users will be notified to evaluate the on-site service
-
If the service fails, try again 10 minutes later
The most simple way to deal with similar scenarios is to use scheduled tasks. If there is a large amount of data, some data may be seriously delayed, and more and more scheduled services lead to cumbersome task scheduling and difficult management.
The queue design
Rabbitmq can be considered for the time being but will not be used because too many businesses currently use other MQ middleware.
Questions to consider before development?
-
Timeliness The consumer can receive it on time
-
Consumption weight of messages at the same time
-
Reliability messages cannot go unconsumed
-
Recoverable At least the data can be recovered if other circumstances cause the messaging system to become unavailable
-
Retractable support for messages that have not reached execution time because they are delayed can be unconsumed
-
The HA/ master/slave mode does not have multiple instances working together
-
How does the consumer end consume
Of course, the primary reason for selecting Redis as the data cache is that Redis itself supports zSET data structure (score delay time is milliseconds), which reduces the trouble of sorting and has high performance. Just as our requirement is to determine the order of execution according to the time dimension and also supports map List data structure.
Simply define a message data structure
private String topic; / *topic/private String id; / *Automatically generates globally unique Snowflake/private String bizKey; private long delay; / *Delay milliseconds/private int priority;//优先级private long ttl;/TTL consumed by the consumer/private String body; / *The message body/private long createTime=System.currentTimeMillis(); private int status= Status.WaitPut.ordinal(); |
---|
Operating principle:
- with
Map
To store metadata. Id as key, the entire message structure is serialized (JSON /…) It is then put into the meta-message pool as a value. - will
id
Put one of them inzset
In the ordered list, createTime+ Delay +priority is used as score. Change the status to Being Delayed - Use timer for real-time monitoring
zset
The top 10 data in an ordered list. If the datascore<
= the current time is fetched in milliseconds, according totopic
Put back into a new consumable list (list
), delete the retrieved data in zset and change the state to unconsumed - The client only needs to fetch data from the consumable queue. And the state must be to consume the running time needs <= the current time if not meet reinsert
zset
List, change the status to being delayed. If yes, the modified state is consumed. Or simply delete the metadata.
The client
Because of the different programming languages involved, HTTP access is currently supported by default.
- Add delay message add success return consume unique ID POST /push {… . The message body}
- Is the message ID GET /delete required to delete a delayed message? id=
- ReStore delayed messages GET /reStore? Expire = true | false expire if recovery is not overdue execution.
- To reStore a single delayed message, the message ID GET /reStore/ ID needs to be passed
- Getting messages requires a long connection to GET/GET /topic
Exposing the service with Nginx, configure polling to evenly distribute traffic when adding delayed messages.
Instead of using HTTP long connections to consume messages, the current system uses MQ to consume data so that clients don’t have to worry about delaying message queues. Just intercept it when sending MQ and if it is a delayed message it will be processed by the delayed message system.
Message recoverable
In normal cases, logs are generally recorded, such as the mysql binlog.
Here we directly use the mysql database for logging.
The following two tables are currently intended to be created:
- The message table field contains the entire message body
- Message flow table fields include message ID, change status, change time,
zset
Scanning thread Name and host/ IP
The zset scanning thread Name is defined to make it clear to which Zset messages are being distributed. If the key of the Zset has something to do with the name of the thread that’s monitoring the Zset or zset key in this case.
Take a chestnut
If the Redis server goes down, the data is gone after the server restarts. Therefore, this recovery is necessary by redistributing all data from table 1 (message table) whose message state is not equal to consumed data to the delay queue and synchronizing the state.
You can do the same for individual tasks.
About High availability
Choose ZooKeeper for distributed coordination.
If there are multiple instances, only one instance can work at most at the same time, thus avoiding the disadvantages brought by distributed competitive locks. Of course, if the business needs multiple instances to work at the same time, it is also supported, that is, a message can only be processed by one instance at most. Zookeeper or Redis can be used to achieve distributed locks.
Finally, I did a test to run multiple instances at the same time, but the performance may be reduced because of the problem related to locks, but the single performance is very good. Therefore, docker-based active/standby deployment is recommended.
extension
Supports configurable number of ZSET queues to avoid high latency caused by big data.
At present, there may be inconsistency between logs and redis metadata. For example, when mysql is down, writing logs fails.
Design: