New feature: Like
Now almost all media content, whether commodity evaluation, topic discussion or circle of friends, support “like”, “like” function has become the standard of Internet projects, so we also try to add “like” function in the evaluation system, to achieve “like” for each evaluation.
The likes of douban
Details of the likes requirements to be implemented:
From giving up
It is very difficult to complete the function of the thumbs-up system. To support level than the number of users, and to do data archive inventory, to support the peak million seconds concurrent writes, and in order to realize real-time synchronization, multiple clients to log and maintaining good relationship with the user’s thumb up, and to show the user list of thumb up, so a full range of demand will produce the contradiction of design, like CAP contradiction.
Typical examples are concurrency versus synchronization. The essence of high concurrency is speed, network transmission speed and program running speed determine the capacity of the system, each request processing speed can handle more requests per unit time, just blindly increase the number of connections and ignore the request response time, concurrency problem can not be fundamentally solved. In my opinion, there are three bottlenecks within an application: network requests, object creation, and redundant computations, in order of priority. Network requests have a decisive influence on response speed. However, synchronization also requires us to make network requests, such as synchronizing data to mysql or Redis. Concurrency and synchronization have irreconcilable contradiction.
There is also the contradiction between storage capacity and access speed. To record the user’s list of thumb up, to maintain the user’s thumb up means, accumulate over a long period, the user’s thumb up relationship on a single storage system of outfit, need to write a distributed storage system, the additional complexity and scheduling time delay, and need to be well designed to distinguish the dimension, data coupling between different partitions. However, once a query spans multiple storage nodes, cascading call is generated, which has a large network delay.
To achieve, first abandon. When I see a new requirement, I tend to think backwards and see what features are not involved in the requirement and what features can be dropped. From this perspective, it is easy to find a clever and simple design solution that meets the current requirement.
When it comes to design decisions, re-creating a list of requirements that you don’t need to implement can be a lot easier.
Product managers will only give you form 1 and they will rarely show you what doesn’t need to be done. You still need to consult before you decide to drop them, because these requirements are often soft and not included in the requirements document does not necessarily mean they are not needed, or they may not be considered.
How to record users’ likes
The likes relationship is a typical k-V type or set type, and Redis is more suitable for implementation. So which data type in Redis?
The following table lists the data types you can think of and their pros and cons.
The key features are batch query and memory footprint. Batch query enables you to query all the “like” relationships in a single request. Memory footprint allows you to solve storage problems with as few Redis nodes as possible, or even one Redis.
I chose the string type, because the hash type is really hard to weed out like data unless you record the like time and periodically scan globally, or record a double hash key and do old and new replacement, which is too expensive to be appropriate. The elimination mechanism itself takes care of the memory footprint, so the string type doesn’t take up an unusually large amount of memory.
Atomicity of the like operation
The “like” operation needs to rewrite two values, one is the user’s “like” relationship on the content, the other is the total number of “like” on the content. Can these two values be represented in a key? Obviously not. Therefore, you need to set the user’s “like” relationship first and then increase the total number of “likes”. If the “like” relationship already exists, the total number of “likes” cannot be increased.
The setnx command can be used to set the likes relationship, only if there is no key, and returns a set flag, which determines whether to increase the total number of likes. Such as:
if setnx(key1) == 1
then
incr(key2)
Copy the code
It seems that every operation is atomic, but if this logic is executed on the client side, it is still not atomic on the whole, and it is still possible to break between two operations, resulting in a successful like but no increase in the count. While this is not a big problem for the like system, the probability of rare occurrence is acceptable, but we can do better.
Redis’s transaction or scripting features address these issues. Script implementation is more flexible and free, and can reduce network requests, we choose the way of script:
-- Like operation, write and increment, if write failure does not increment, [atomicity, idempotency]
if redis.call('SETNX',KEYS[1].1) = =1
then
redis.call('EXPIRE',KEYS[1].864000)
redis.call('INCR',KEYS[2])
end
return redis.call('GET',KEYS[2])
Copy the code
-- Cancel the "like" operation, delete and decrement, if the deletion fails, do not decrement.
if redis.call('DEL',KEYS[1= =])1
then
redis.call('DECR',KEYS[2])
end
return redis.call('GET',KEYS[2])
Copy the code
One of the basic requirements of stability is that data cannot expand indefinitely, otherwise, sooner or later, problems will occur. Any storage scheme must be designed to destroy the corresponding scheme, so as to ensure the stable and long-term operation of the system. So it is important to set the expiration date for KEY1, and KEY2 may need to be kept in place until it is removed by other mechanisms, such as when an old evaluation is destroyed or when an evaluation is collapsed.
The script returns the total number of likes, which is useful for subsequent data archiving.
Encapsulating script operations
Now that you’ve decided on redis storage, let’s implement it first. Step by step, solid to complete the “like” function.
The Lua script is first configured using Spring, which automatically preloads the script without bothering to precompile it with Script Load on the Redis server.
/** * Lua script */
@Configuration
public class LuaConfiguration {
/** * [like] script lua_set_and_incr */
@Bean
public DefaultRedisScript<Integer> voteScript(a) {
DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_set_and_incr.lua")));
redisScript.setResultType(Integer.class);
return redisScript;
}
/** * [unlike] lua_del_and_decr */
@Bean
public DefaultRedisScript<Integer> noVoteScript(a) {
DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_del_and_decr.lua")));
redisScript.setResultType(Integer.class);
returnredisScript; }}Copy the code
/** ** like box */
@Repository
public class VoteBox {
private final RedisTemplate<String, Object> redisTemplate;
private final DefaultRedisScript<Integer> voteScript;
private final DefaultRedisScript<Integer> noVoteScript;
public VoteBox(RedisTemplate<String, Object> redisTemplate, DefaultRedisScript<Integer> voteScript, DefaultRedisScript<Integer> noVoteScript) {
this.redisTemplate = redisTemplate;
this.voteScript = voteScript;
this.noVoteScript = noVoteScript;
}
/** * To vote for the evaluation (like), the user can add the evaluation like record, the number of evaluation like +1. The operation is atomic and idempotent. *@paramVoterId voter *@paramContentId voting target contentId *@returnReturns the current number of likes */
public Integer vote(long voterId, long contentId){
// Use lua scripts
List<String> list = new ArrayList<>();
list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
return redisTemplate.execute(voteScript, list);
}
/** * Cancel the evaluation vote (like), the user deletes the evaluation like record, the evaluation like number -1. The operation is atomic and idempotent. *@paramVoterId voter *@paramContentId voting target contentId *@returnReturns the current number of likes */
public Integer noVote(long voterId, long contentId){
// Use lua scripts
List<String> list = new ArrayList<>();
list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
returnredisTemplate.execute(noVoteScript, list); }}Copy the code
The process of liking
The process of liking can be represented by the following sequence diagram:
- The server receives the user’s “like” request
- Execute redis script, and return the total number of likes information, Redis save the temporary data of the like function
- Sends a normal message to a message queue
- If the preceding two steps are successful, the response is complete. Otherwise, the system is added to the retry queue
- Retry queue Asynchronously retries a request redis or message queue until it succeeds or the number of retries is exhausted
- The message queue consumer receives the message and writes it to mysql
Why join the message queue role? Because message queues allow synchronization and asynchrony to be elegantly separated. The redis command needs to be executed in the current request. The user wants to see the result of the request, and wants to see his “like” status on other clients immediately. This example may not be appropriate. Data entry or other operations do not need to be completed during the current request life cycle.
If synchronous services can be called “online services”, asynchronous services can be called “semi-online and semi-offline services”. Although they are not in the request life cycle, they run on online servers, occupy CPU and memory, and occupy network bandwidth, which inevitably affects online services. When the asynchronous mode is adjusted, it needs to be published along with the online business, resulting in logical coupling. Message queues enable “offline services”, where consumers can be independently developed and deployed from online servers, completely decouple physically and logically. Of course, the serialization format of the message object is consistent, so I prefer to use strings as the content of the message object rather than object serialization.
Mysql implementation of the like into the library
After the storage scheme of Redis is designed, the storage scheme of mysql is designed.
The first is the table structure:
# Like/vote archive form
CREATE TABLE IF NOT EXISTS vote_document
(
id INT primary key auto_increment COMMENT 'ID',
gmt_create datetime not null default CURRENT_TIMESTAMP COMMENT 'Creation time',
voter_id INT not null COMMENT 'Voter ID',
contentr_id INT not null COMMENT 'Voting Content ID',
voting TINYINT not null COMMENT 'Vote status (0: cancelled vote 1: voted)',
votes INT not null COMMENT 'The total number of votes cast/abstained at this time',
create_date INT not null COMMENT 'Create date for example: 20210414 used to partition sub-table'
);
insert into vote_document(voter_id,content_id,voting,votes,create_date)
values(1.1.1.1.'20210414');
Copy the code
Obviously, this is an Insert instead of Update log table, and any likes, unlikes, or re-likes add new records, not modify existing records. There are two reasons for doing this. First, Insert does not need to lock the table, so the execution efficiency is much higher than Update. Second, it contains more information and can see the complete behavior of users, which is helpful for big data analysis.
After replacing Update with Insert, one of the major difficulties is data aggregation. The solution is to record the aggregation status of each Insert in a redundant way, just like the votes field. During analysis, only the last record of relevant comments is required to know the total number of likes, without the need for full table scan.
Storage code:
@Repository
public class VoteRepository {
@Autowired
private JdbcTemplate db;
/** * add thumbs-up *@paramVote Like object *@returnReturn true if the insert was successful, false */ otherwise
public boolean addVote(/*valid*/ Vote vote) {
String sql = "insert into vote_document(voter_id,content_id,voting,votes,create_date) values(? ,? ,? ,? ,?) ";
return db.update(sql, vote.getVoterId(), vote.getContentId(), vote.getVoting(), vote.getVotes(), Sunday.getDate()) > 0; }}Copy the code
RocketMQ
Apache RocketMQ is a distributed messaging middleware with low latency, high concurrency, high availability, and high reliability. Message queue RocketMQ not only provides asynchronous decoupling and peak-filling capabilities for distributed applications, but also features massive message accumulation, high throughput, and reliable retry for Internet applications.
Message queue core concepts:
- Topic: Message Topic, a level 1 message type to which a producer sends a message.
- Broker: A node in a message queue cluster that stores and sends and receives messages.
- Producer: Also known as message publisher, responsible for producing and sending messages to a Topic.
- Consumer: Also known as message subscriber, responsible for receiving and consuming messages from a Topic.
- Tag: Message Tag. A secondary message type that represents a specific message classification under a Topic.
- Messages: A combination of data and (optional) properties that a producer sends to a Topic and eventually to a consumer.
- Message attributes: Attributes that producers can define for messages, including Message Key and Tag.
- Group: A class of producers or consumers that typically produce or consume the same type of message and that publish or subscribe logically.
A producer sends a message to a message queue, and finally to a consumer, as shown below:
Message types can be divided into:
- General message. Also known as concurrent messages, there is no order, production and consumption are parallel, with high throughput performance
- Transaction messages. Provides a mechanism to ensure that messages are sent to the broker.
- Partition order messages. Topic is divided into multiple partitions, and within a partition follows a first-in, first-out rule.
- Global order messages. Set the number of Topic partitions to 1, and all messages follow a first-in, first-out rule.
- Timing message. The message is sent to the MQ server and delivered at a specified point in time after the message was sent (the current time)
- Delayed messages. The message is sent to the MQ server and delivered at a specified delay point after the message was sent (the current time)
Consumption mode can be divided into:
- Cluster consumption. Any message only needs to be processed by any consumer in the cluster.
- Radio consumption. Push each message to all registered consumers in the cluster, ensuring that the message is consumed by each consumer at least once.
Consumer message acquisition patterns can be divided into:
- A Push. Open a separate thread to poll the broker for messages, calling back to the consumer’s receiving method as if the broker were pushing messages to the consumer.
- The Pull. The consumer actively pulls the message from the message queue.
Using RocketMQ
Using the RocketMq message queue of a cloud product, we first created groups and topics in the cloud control center according to the official documentation, and then introduced maven dependencies to create MqConfig connection configuration objects. Finally:
Configure producers (in item A) :
@Configuration
public class ProducerConfig {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Producer buildProducer(a) {
returnONSFactory.createProducer(mqConfig.getMqPropertie()); }}Copy the code
Configure consumer (in item B) :
@Configuration
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private VoteMessageReceiver receiver;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer(a) {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.GROUP_CONSUMER_ID);
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "10");
consumerBean.setProperties(properties);
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
subscription.setTopic(mqConfig.TOPIC_ISSUE);
subscription.setExpression(mqConfig.TAG_ISSUE);
subscriptionTable.put(subscription, receiver);
consumerBean.setSubscriptionTable(subscriptionTable);
returnconsumerBean; }}Copy the code
Create a message receiver, listener:
/** * Vote message receiver */
@Component
public class VoteMessageReceiver implements MessageListener {
private final VoteRepository voteRepository;
public VoteMessageReceiver(VoteRepository voteRepository) {
this.voteRepository = voteRepository;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject object = JSONObject.parseObject(new String(message.getBody()));
Vote vote = new Vote();
vote.setVoterId(object.getLongValue("voterId"));
vote.setContentId(object.getLongValue("contentId"));
vote.setVoting(object.getIntValue("voting"));
vote.setVotes(object.getLongValue("votes"));
try {
vote.validate();
voteRepository.addVote(vote);
} catch (IllegalArgumentException ignored) {
}
return Action.CommitMessage;
}catch (Exception e) {
e.printStackTrace();
returnAction.ReconsumeLater; }}}Copy the code
The producer that sent the message, a little more encapsulated:
/** * message producer, message delivery warehouse */
@Repository
public class MessagePoster {
private final Producer producer;
public MessagePoster(Producer producer) {
this.producer = producer;
}
public void sendMessage(String topic, String tag, String content){
Message message = new Message();
message.setTopic(topic);
message.setTag(tag);
message.setBody(content.getBytes(StandardCharsets.UTF_8));
producer.send(message);
}
public void sendMessage(String topic, String content){
sendMessage(topic, "", content); }}Copy the code
Release to consumers, test in the cloud control center (make sure the process works, step by step) :
Can we reach an agreement
Can the redis command execute and the message send be consistent, that is, simultaneous completion and simultaneous failure? If the system is homogeneous, you can use the system’s own features to implement transactions, such as redis transactions or scripts for the same operation, we have done this before, if the same database operation, you can use database transactions, other storage systems should have similar support.
But they are heterogeneous systems that can only be coordinated by implementing transaction logic on the client side or by a third party. A common client-side implementation is rollback:
try{
redis.call();
mq.call();
}catch(MqException e){ // Rollback is required only if MQ fails
// Roll back using reverse operations
redis.rollback();
}
Copy the code
But what if the rollback fails? What if a message is sent to MQ but fails to receive? What if the dependent service does not support rollback? It is impossible to achieve demanding consistency under demanding conditions.
Or we should think in the opposite direction and selectively discard some unimportant parts in order to achieve our needs. In this case, there is no need to introduce third party transaction coordination for redis and MQ synchronization, but the obvious transaction issues should not be ignored either.
My summary of distributed transaction solution roadmap:
We chose to use retry queues to solve this problem.
Designing a Retry queue
Not limited to the current distributed transaction problem, we design a more general retry queue.
Start by designing the basic concept in retry queues: tasks. A task consists of units representing a method object that has a return value and an execution unit representing a method object that has no return value but receives the return value of the previous computation unit as an input parameter. The unidirectional linked list of units is maintained in the task. Only when a unit is successfully executed, it will point to the next unit to continue execution. However, when the execution fails, the current unit will continue to retry until it succeeds. This ensures the stable and orderly operation of each unit, and the execution of each link is fault-tolerant.
Based interface, let users can achieve our mission failure logging, such as persistent disk or sent to the remote server, and avoid lost task, is to keep one of transactional consistency out plan, set to the default method enables the user to selectively achieve, not mandatory must deal with failure.
/** * failed logger */
interface IFailRecorder {
/** * Records each retry failure *@paramAttemptTimes Number of retries, first retry =0 *@paramE Failed exception */
default void recordFail(int attemptTimes, Exception e){}
/** * Records each retry failure *@paramAttemptTimes Number of retries, first retry =0 */
default void recordFail(int attemptTimes){}
/** * Records the final failure after the retry *@paramE An exception that causes a failure. If there is no exception, null */ is returned
default void recordEnd(Exception e){}}Copy the code
Define the basic unit of execution, which means that a REDis operation or MQ operation needs to be performed. Interface methods may be executed repeatedly by the scheduler, so the interface implementers themselves are required to ensure idempotency.
/** * repeatable tasks */
public interface Repeatable<V> extends IFailRecorder{
/**
* Computes a result, or throws an exception if unable to do so.
*
* @param repeatTimes repeat times, first repeatTimes is 0
* @return computed result
* @throws Exception if unable to compute a result
*/
V compute(int repeatTimes) throws Exception;
/**
* Execute with no result, and throws an exception if unable to do so.
*
* @param repeatTimes repeat times, first repeatTimes is 0
* @param receiveValue last step computed result
* @throws Exception if unable to compute a result
*/
default void execute(int repeatTimes, V receiveValue) throws Exception{}
/**
* Execute with no result, and throws an exception if unable to do so.
*
* @param repeatTimes repeat times, first repeatTimes is 0
* @throws Exception if unable to compute a result
*/
default void execute(int repeatTimes) throws Exception{}}Copy the code
The corresponding derived abstract class is mainly used to guide the user to implement the interface.
/** * computable tasks *@param<V> Calculation result type */
public abstract class Computable<V> implements Repeatable<V>{
@Override
public void execute(int repeatTimes) throws Exception {
throw new IllegalAccessException("Unsupported methods");
}
@Override
public void execute(int repeatTimes, V receiveValue) throws Exception {
throw new IllegalAccessException("Unsupported methods"); }}/** * Executable tasks */
public abstract class Executable<V> implements Repeatable<V>{
@Override
public V compute(int repeatTimes) throws Exception {
throw new IllegalAccessException("Unsupported methods"); }}Copy the code
Meaning of Retry
A good retry mechanism can cut the peak and fill the valley, while a bad retry mechanism can add fuel to the fire.
This is not to be alarmist, but think carefully about the circumstances under which a program will fail, which can be roughly summed up in three situations:
- Logical exception caused by parameter error
- Timeout or fuse due to heavy load
- Unstable networks and manual accidents
There is no point in retrying for case 1, parameter errors should be solved by changing parameters, logical exceptions should fix logic bugs, and mindless retries will only make errors repeat and waste CPU. Be careful with case 2 retries, because if you fail at a traffic peak, you are likely to fail again after a short time, and this retry will cause even more traffic pressure, which will snowball you down, adding fuel to the fire.
Retry in case 3 is valuable, especially for third-party services with SLAs. The third-party service may be temporarily unavailable due to various accidents (such as service interruption and update), but it does not violate the SLA agreement. Adding this failure to the retry queue ensures that the task succeeds as long as the third party service responds for a long period of time. If the third party service does not respond and the task fails, it often breaks the SLA and can apply for compensation.
Therefore, when designing a retry policy, it is necessary to determine the conditions under which retries are required. You can specify that when certain exceptions such as parameter errors occur, there is no need to retry. You can simply fail. You can specify success only if the return parameter is not null. A fixed retry interval can be set to allow a long time between retries.
A smarter approach is to use circuit breaker mode, with the result of the current connection request to the target server, and if it does not meet expectations (with a high exception rate), temporarily block the task waiting in the retry queue and try again after a while.
The difference between retry queue and normal traffic limiting degrade or fuse:
Retry policy
A retry policy determines when a task is retried. The retry policy interface:
/** * A retry policy that determines when a task can be retried */
public interface IRetryStrategy {
/** * Whether to retry now *@paramAttemptTimes Number of retries *@paramLastTimestamp Timestamp of the last retry *@paramItemId Id of the current executing item *@returnReturns true if retries are allowed, false */ otherwise
boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId);
/** * Notify a failure *@paramItemId Id of the current executing item */
void noticeFail(int itemId);
/** * Notification succeeded *@paramItemId Id of the current executing item */
void noticeSuccess(int itemId);
}
Copy the code
Basic implementation classes:
/** * Specifies the retry policy at the specified interval */
public class DefinedRetryStrategy implements IRetryStrategy {
private final int[] intervals;
public DefinedRetryStrategy(int. intervals) {
if (intervals.length == 0) {
this.intervals = new int[] {0};
} else {
this.intervals = intervals; }}private DefinedRetryStrategy(a) {
this.intervals = new int[] {0};
}
/** * Whether to perform retry ** now@paramAttemptTimes Number of retries *@paramLastTimestamp Timestamp of the last retry *@paramItemId Id of the current executing item *@returnReturns true if retries are allowed, false */ otherwise
@Override
public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
return System.currentTimeMillis() > lastTimestamp + getWaitSecond(attemptTimes) * 1000L;
}
@Override
public void noticeFail(int itemId) {}@Override
public void noticeSuccess(int itemId) {}/** * The interval for the next retry is obtained based on the current retry count */
private int getWaitSecond(int attemptTimes) {
if (attemptTimes < 0) {
attemptTimes = 0;
}
if (attemptTimes >= intervals.length) {
attemptTimes = intervals.length - 1;
}
returnintervals[attemptTimes]; }}Copy the code
Retry policy is implemented using circuit breakers, which are internally omitted:
/** * Intelligent retry strategy implemented in circuit breaker mode */
public class SmartRetryStrategy extends DefinedRetryStrategy {
// Set of circuit breakers
private final Map<Integer, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();
private final Object LOCK = new Object();
private static CircuitBreaker newCircuitBreaker(a) {
return new ExceptionCircuitBreaker();
}
public SmartRetryStrategy(int[] intervals) {
super(intervals);
}
private CircuitBreaker getCircuitBreaker(Integer itemId) {
if(! circuitBreakers.containsKey(itemId)) {synchronized (LOCK) {
if(! circuitBreakers.containsKey(itemId)) { circuitBreakers.put(itemId, newCircuitBreaker()); }}}return circuitBreakers.get(itemId);
}
/** * Whether to perform retry ** now@paramAttemptTimes Number of retries *@paramLastTimestamp Timestamp of the last retry *@paramItemId Id of the current executing item *@returnReturns true if retries are allowed, false */ otherwise
@Override
public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
// If the basic conditions are not met, retry cannot be performed
if (!super.shouldTryAtNow(attemptTimes, lastTimestamp, itemId)) {
return false;
}
// Whether the breaker allows requests to pass
return canPass(itemId);
}
/** * Notify a failure **@paramItemId Id of the current executing item */
@Override
public void noticeFail(int itemId) {
getCircuitBreaker(itemId).onFail();
}
/** * Notify a success **@paramItemId Id of the current executing item */
@Override
public void noticeSuccess(int itemId) {
getCircuitBreaker(itemId).onSuccess();
}
/**
* 是否允许通过
*/
public boolean canPass(int itemId){
returngetCircuitBreaker(itemId).canPass(); }}Copy the code
Retried task
Define the retried task interface according to the structure diagram above:
/** * Retry the task */
public interface IRetryTask<V> {
/** * Perform a retry *@returnReturns true on success, false */ otherwise
boolean tryOnce(a);
/** * Whether the task should be closed *@returnReturns true if the maximum number of retries is reached, indicating that */ can be turned off
boolean shouldClose(a);
/** * Whether to retry now *@returnIf the wait time exceeds the retry interval, retries are allowed, and false */ is returned otherwise
boolean shouldTryAtNow(a);
/** * Obtain the result */
V getResult(a);
}
Copy the code
Then design the abstract class:
/** * Retry the task. * not thread safe */
public abstract class AbstractRetryTask<V> implements IRetryTask<V> {
// Retry wait interval
protected final IRetryStrategy retryStrategy;
// The current number of retries
protected int curAttemptTimes = -1;
// Maximum number of retries
private final int maxAttemptTimes;
// Timestamp of the last retry
protected long lastTimestamp = 0;
public AbstractRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes) {
this.retryStrategy = retryStrategy;
this.maxAttemptTimes = maxAttemptTimes;
}
/** * Perform a retry **@returnReturns true on success, false */ otherwise
@Override
public boolean tryOnce(a) {
if (isFinished()) {
return true;
}
setNextCycle();
// Perform a retry
doTry();
// If the task execution is abnormal or null is returned, the task execution fails
return isFinished();
}
/**
* 是否结束
*/
protected abstract boolean isFinished(a);
/** * Perform the callback */
protected abstract void doTry(a);
/** * Whether the task should be closed **@returnReturns true if the maximum number of retries is reached, indicating that */ can be turned off
@Override
public boolean shouldClose(a) {
return curAttemptTimes >= maxAttemptTimes;
}
// Set the next execution period
private void setNextCycle(a) { curAttemptTimes++; lastTimestamp = System.currentTimeMillis(); }}Copy the code
And implementation classes:
/** * Multiple retry tasks. When a task link fails, the next retry continues from the point of failure. * /
@Slf4j
public class SegmentRetryTask<V> extends AbstractRetryTask<V> {
// segment execution method
private final List<Repeatable<V>> segments;
// Current execution fragment, last execution interrupted fragment
private int currentSegment = 0;
// Last execution result value
private V result;
public SegmentRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes, List<Repeatable<V>> segments) {
super(retryStrategy == null ? new DefinedRetryStrategy(0) : retryStrategy, maxAttemptTimes);
this.segments = segments;
}
/** * Perform the callback */
@Override
protected void doTry(a) {
try {
for (; currentSegment < segments.size(); currentSegment++) {
// If the current circuit breaker is on, no attempt is made
if (retryStrategy instanceof SmartRetryStrategy){
if(! ((SmartRetryStrategy)retryStrategy).canPass(currentSegment)) { segments.get(currentSegment).recordFail(curAttemptTimes,new CircuitBreakingException());
return; }}// If an exception is thrown, the segment counter does not increase and is executed from this location next time
Repeatable<V> repeatable = segments.get(currentSegment);
if(! execute(repeatable))return; }}catch (Exception e) {
retryStrategy.noticeFail(currentSegment);
if (currentSegment < segments.size()) {
if (shouldClose()) {
segments.get(currentSegment).recordEnd(e);
} else{ segments.get(currentSegment).recordFail(curAttemptTimes, e); }}}}private boolean execute(Repeatable<V> repeatable) throws Exception {
if (repeatable instanceof Computable) {
result = repeatable.compute(curAttemptTimes);
if (result == null) {
repeatable.recordFail(curAttemptTimes);
retryStrategy.noticeFail(currentSegment);
return false;
}
retryStrategy.noticeSuccess(currentSegment);
}
if (repeatable instanceof Executable) {
if (result == null) {
repeatable.execute(curAttemptTimes);
} else {
repeatable.execute(curAttemptTimes, result);
}
retryStrategy.noticeSuccess(currentSegment);
}
return true;
}
@Override
protected boolean isFinished(a) {
return currentSegment >= segments.size();
}
/** * Whether to perform retry ** now@returnIf the wait time exceeds the retry interval, retries are allowed, and false */ is returned otherwise
@Override
public boolean shouldTryAtNow(a) {
return retryStrategy.shouldTryAtNow(curAttemptTimes, lastTimestamp, currentSegment);
}
/** * Obtain the result */
@Override
public V getResult(a) {
returnresult; }}Copy the code
A unit test, of course, there are many unit tests, not all of them can be posted, but only representative ones are shown here:
class SegmentRetryTaskTest {
private final List<String> messages = new ArrayList<>();
@Test
void doTry(a) {
List<Repeatable<String>> list = new ArrayList<>();
list.add(new Computable<>(){
@Override
public String compute(int repeatTimes) throws Exception {
if (repeatTimes < 2)
throw new Exception();
if (repeatTimes < 4)
return null;
messages.add("result:good");
return "good";
}
@Override
public void recordFail(int attemptTimes, Exception e) {
messages.add("fail:" + attemptTimes);
}
@Override
public void recordFail(int attemptTimes) {
messages.add("fail:" + attemptTimes);
}
@Override
public void recordEnd(Exception e) {
messages.add("end"); }}); list.add(new Executable<>() {
@Override
public void execute(int repeatTimes, String receiveValue) throws Exception {
messages.add("receive:" + receiveValue);
throw new Exception("exc");
}
@Override
public void recordEnd(Exception e) {
messages.add("end:"+ e.getMessage()); }}); IRetryTask retryTask =new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 5, list);
// Retry not started
assertFalse(retryTask.shouldClose());
// Retry until successful
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.shouldClose());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertFalse(retryTask.tryOnce());
assertTrue(retryTask.shouldClose());
assertTrue(messages.contains("result:good"));
assertTrue(messages.contains("fail:1"));
assertTrue(messages.contains("fail:2"));
assertTrue(messages.contains("fail:3"));
assertFalse(messages.contains("end"));
assertTrue(messages.contains("receive:good"));
assertTrue(messages.contains("end:exc")); }}Copy the code
Retry queue operation
Thread-safe retry queues. * (Neither Spring-Retry nor Guava-Retrying is a perfect fit for this scenario, so you decided to develop a simple retry mechanism of your own.) * Retry queues do their best to make the task execute multiple times and succeed. Consider the following points when using them. *1.Retry queues are stored in memory and have not yet been synchronized to disk, requiring users to tolerate the risk of loss. *2.A retry is not guaranteed to succeed. It will end after a certain number of retries. If the retry fails, the failure result will be recorded. *3.To prevent excessive system load due to frequent retry, you are advised to set a proper retry interval to reduce peak load and fill valley load. *4.When the number allowed by the retry queue is exceeded, an exception is thrown. *5.The retry task will be executed in a separate thread and will not block the current thread *6.Retry task execution exception or returnnullIs regarded as an execution failure. Currently, intercepting custom exceptions is not supported. *7.Due to network problems, a successful execution of a remote procedure does not necessarily mean a successful return, and the retry task needs to be idempotent. *8."Queue"Refers only to scanning tasks in first-in, first-out order, and the task removal queue operation depends on when it is completed or terminated *Copy the code
Implementing retry queues
/** * thread safe retry queue. *@author sunday
* @version0.0.1 * /
public final class RetryQueue {
// Retry task queue (globally unique)
private final static Deque<IRetryTask> retryTaskList = new ConcurrentLinkedDeque<>();
// Retry the task factory
private final IRetryTaskFactory retryTaskFactory;
public RetryQueue(IRetryTaskFactory retryTaskFactory) {
this.retryTaskFactory = retryTaskFactory;
}
static {
Thread daemon = new Thread(RetryQueue::scan);
daemon.setDaemon(true);
daemon.setName(RetryConstants.RETRY_THREAD_NAME);
daemon.start();
}
// Scan the retry queue, retry and remove the task (if successful), periodically
private static void scan(a) {
while (true) {
// Execute first and then delete
retryTaskList.removeIf(task -> retry(task) || task.shouldClose());
// wait some times
try {
TimeUnit.MILLISECONDS.sleep(RetryConstants.SCAN_INTERVAL);
} catch (Throwable ignored) {
}
}
}
// Perform a retry
private static boolean retry(/*not null*/IRetryTask task) {
if (task.shouldTryAtNow()) {
return task.tryOnce();
}
return false;
}
/** * Submit the task. Execute immediately on the current thread, and if it fails, create a wrapper object using the retry task factory set up and write the object to the retry queue for asynchronous retries. * *@paramSegments Performs tasks in segments *@param<V> Result return type *@returnIf the current thread succeeds on the first attempt, the result value is returned synchronously, otherwise it is queued for retry and notified asynchronously of the result value. *@throwsRetryRefuseException An exception */ is thrown when the number allowed by the retry queue is exceeded
public final <V> V submit(List<Repeatable<V>> segments) throws RetryRefuseException {
if (segments == null || segments.size() == 0) {
return null;
}
IRetryTask<V> task = retryTaskFactory.createRetryTask(segments);
// Execute in the current thread
if(! task.tryOnce()){// Join queue after failure
ensureCapacity();
retryTaskList.push(task);
}
Return as long as there is an execution result, even if it is added to the retry queue
return task.getResult();
}
/** * Submit the task. Execute immediately on the current thread, and if it fails, create a wrapper object using the retry task factory set up and write the object to the retry queue for asynchronous retries. * *@paramRepeatable Executes the task *@param<V> Result return type *@returnIf the current thread succeeds on the first attempt, the result value is returned synchronously, otherwise it is queued for retry and notified asynchronously of the result value. *@throwsRetryRefuseException An exception */ is thrown when the number allowed by the retry queue is exceeded
public final <V> V submit(Repeatable<V> repeatable) throws RetryRefuseException {
return submit(List.of(repeatable));
}
// Ensure capacity
private void ensureCapacity(a) throws RetryRefuseException {
// Non-thread-safe, high concurrency may briefly exceed the maximum capacity, but this is not a problem
if (retryTaskList.size() >= RetryConstants.MAX_QUEUE_SIZE) {
throwRetryRefuseException.getInstance(); }}/** * whether the queue is empty **@returnReturns true */ if no task is currently being executed
public boolean isEmpty(a) {
returnretryTaskList.isEmpty(); }}Copy the code
Unit tests:
class RetryQueueTest {
private final static int NUM = 100000;
private List<String> messages1 = Collections.synchronizedList(new ArrayList<>());
private List<String> messages2 = Collections.synchronizedList(new ArrayList<>());
IRetryTaskFactory taskFactory = new IRetryTaskFactory() {
@Override
public <V> IRetryTask createRetryTask(List<Repeatable<V>> segments) {
return new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 10, segments); }}; RetryQueue retryQueue =new RetryQueue(taskFactory);
@Test
void submit(a) {
List<Repeatable<String>> list = new ArrayList<>();
list.add(new Executable<>() {
@Override
public void execute(int repeatTimes) throws Exception {
if (repeatTimes < 4)
throw new Exception();
messages1.add("good"); }});// Simulate high concurrent commits
ExecutorService executorService = Executors.newFixedThreadPool(100);
Semaphore semaphore = new Semaphore(0);
for (int i = 0; i < NUM; i++) {
executorService.submit(() -> {
try {
retryQueue.submit(list);
} catch (RetryRefuseException e) {
fail();
}
semaphore.release();
});
}
executorService.shutdown();
// Wait for execution to complete
try {
semaphore.acquire(NUM);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Wait for execution to complete
while(! retryQueue.isEmpty()) Thread.yield(); assertEquals(NUM, messages1.size());for (String s : messages1) {
assertEquals(s, "good"); }}}Copy the code
Waiting for the like implementation code
Ok, now that the wheel is built, you can start writing the code to like the service:
/** * Voting service */
@Service
@Slf4j
public class VoteService {
private final VoteBox voteBox;
private final MessagePoster mq;
private final RetryQueue retryQueue = new RetryQueue(new SegmentRetryTaskFactory());
public VoteService(VoteBox voteBox, MessagePoster mq) {
this.voteBox = voteBox;
this.mq = mq;
}
/** ** Vote for the rating **@paramVoterId voter *@paramContentId voting target contentId *@paramVoting whether or not the vote is upvoted (true: upvoted false: unvoted) *@returnThe total number of likes for the current content. If the likes fail, an exception * will be thrown@throwsVoteException Voting exception */
public int vote(long voterId, long contentId, boolean voting) throws VoteException {
/* * Case zero: the user request is not sent to the server. The user can retry in due course. * In the first case, execute 1 fails and the final "like" fails. Log and join the retry queue pool. The user can also retry when appropriate. * The second case: execute 1 successfully, but return network exception, finally failed to like, log, join the retry queue pool, the user may retry in due time, this method is idempotent. * Third case: Execute 1 succeeds, but does not increase the total number of likes because it is a repeat submission. The method is idempotent still after the logic is executed. * Fourth case: execution 1 succeeds but execution 2 fails, logs, adds the sending MQ to the retry queue pool, and returns success. * Fifth case: the execution method succeeds, but the network is abnormal during the return process, and the user does not receive the response. The user can query the "like" result later, and the user can also retry at the appropriate time */
List<Repeatable<Integer>> list = new ArrayList<>();
//1. Vote in redis first
list.add(new Computable<>() {
@Override
public Integer compute(int repeatTimes) {
return voting ? voteBox.vote(voterId, contentId) : voteBox.noVote(voterId, contentId);
}
@Override
public void recordFail(int attemptTimes, Exception e) {
// Record only the first error
if (attemptTimes == 0)
log.warn("function VoteService.vote.redis make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}
@Override
public void recordEnd(Exception e) {
Of course, the failure will be logged, or logged to a central repository through some other mechanism, and eventually recovered.
log.warn("function VoteService.vote.redis quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); }});//2. Notify MQ again
list.add(new Executable<>() {
@Override
public void execute(int repeatTimes, Integer receiveValue) {
JSONObject object = new JSONObject();
object.put("voterId", voterId);
object.put("contentId", contentId);
object.put("voting", voting ? 1 : 0);
object.put("votes", receiveValue);
mq.sendMessage(SystemConstants.VOTE_TOPIC, object.toString());
}
@Override
public void recordFail(int attemptTimes, Exception e) {
if (attemptTimes == 0)
log.warn("function VoteService.vote.mq make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}
@Override
public void recordEnd(Exception e) {
log.trace("function VoteService.vote.mq quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); }}); Integer value =null;
try {
// The system may fail due to overloading of MQ or Redis itself. We want to cherish a like from the user, so we choose to retry for him.
value = retryQueue.submit(list);
} catch (RetryRefuseException e) {
log.error("function VoteService.vote.refuse make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
}
if (value == null) {We will try again later, but still inform users of this information so that they can make a more sensible choice.
throw new VoteException("Vote failed. Please try again later.");
}
return value;
}
private static class SegmentRetryTaskFactory implements IRetryTaskFactory {
private final static IRetryStrategy waitStrategy = new SmartRetryStrategy(new int[] {10.100.100.1000.10000});
@Override
public <V> IRetryTask<V> createRetryTask(List<Repeatable<V>> segments) {
return new SegmentRetryTask<>(waitStrategy, 5, segments); }}}Copy the code
Supplementary notes:
- The purpose of encapsulating factory objects is to simplify constructor parameters and reuse immutable objects, such as retry policies.
- As long as the retry queue execution returns a result, even if it is only partially successful, the interface response is regarded as successful, and the rest is added to the retry queue.
- If all retry queue executions fail and no result is returned, an exception is thrown, because it did fail at this point and the user has a right to know.
- The task is executed only when the fuse is closed, otherwise it will wait forever. You can perfect this mechanism by setting an appropriate abort strategy.
- The retry queue wheel is also useful in many other scenarios, which I understand to be roughly the “warehouse layer.”
In the case of the like implementation, however, there is no need to use retry. In fact, MQ is highly available on multiple nodes and generally does not cause problems, and MQ comes with retry functionality. Mq’s retry mechanism, in which a request fails and a request is immediately sent to another broker, is a load-balancing and highly available design. Mq can be considered reliable in situations where rigid transactions are not required.
Add a “like” to a comment
Rating list data is relatively static and does not contain user personalization information, which can be easily cached for all to access, but becomes difficult to cache once the user likes each rating relationship, or the number of likes that change in real time, is added. We choose to separate static and static data according to the original cache strategy, while dynamic data is exclusively obtained from redis service and then appended to static data.
Service layer, control layer, is the aggregation of data layer, task delegation layer.
As for data aggregation, there are three modes:
We chose the third way, this time to design the “like” function, just as part of the evaluation system.
Add the following code to the RemarkService:
/** * Add a "like" to the rating list, modify the existing list data *@paramRemarks List *@paramConsumerId User ID *@returnRevised list of comments */
public JSONArray appendVoteInfo(JSONArray remarks, Integer consumerId){
if (remarks == null || remarks.size() == 0) {
return remarks;
}
// Get the list of evaluation ids
List<Object> idList = new ArrayList<>();
for (int i = 0; i < remarks.size(); i++) {
idList.add(remarks.getJSONObject(i).getString("id"));
}
// Get and add the total number of likes
List<String> voteKeys = new ArrayList<>();
for (Object s : idList) {
voteKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, s));
}
List<Object> voteValues = redisRepository.readAll(voteKeys);
for (int i = 0; i < remarks.size(); i++) {
remarks.getJSONObject(i).put("votes", voteValues.get(i) == null ? 0 : voteValues.get(i));
}
// The user id is not uploaded
if (consumerId == null) {
return remarks;
}
// Get and add personal likes status
List<String> votesKeys = new ArrayList<>();
for (Object s : idList) {
votesKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, consumerId, s));
}
List<Object> votingValues = redisRepository.readAll(votesKeys);
for (int i = 0; i < remarks.size(); i++) {
remarks.getJSONObject(i).put("voting", votingValues.get(i) == null ? 0 : 1);
}
return remarks;
}
// Update the evaluation cache for the item
private void updateRemarkCache(String itemId){
// Swallow the exception so that the updated evaluation method does not affect the execution result of the original operation
try {
redisRepository.refreshKeys(RedisKeyConstants.REMARK_PREFIX + itemId);
} catch (Exception e) {
log.warn("function RemarkService.updateRemarkCache make exception:{} by:{}", e.getMessage(), itemId); }}Copy the code
Modify the query evaluation list interface and aggregate the content:
/** * query the item associated with the evaluation, one query fixed item *@paramItemId Id of an item *@paramCurIndex Current query coordinates */
@GetMapping("/remark")
public APIBody listRemarks(String itemId, int curIndex, Integer consumerId){ Assert.isTrue(! StringUtils.isEmpty(itemId),"Item ID cannot be empty");
Assert.isTrue(curIndex > 0."Query coordinate exception");
JSONArray list = remarkService.listRemarks(itemId, curIndex, SystemConstants.REMARK_MAX_LIST_LENGTH);
// The original list is static data read from Redis or DB, while the likes change from hour to hour.
return APIBody.buildSuccess(remarkService.appendVoteInfo(list, consumerId));
}
Copy the code
Optimization point: The total number of likes of evaluation is fixed and irrelevant to users, which can be combined with the evaluation content and cached in memory, while the user’s like information can only be queried by Redis every time.
Recommend quality evaluation
A complete review system should be able to output a recommended list of quality reviews as the default display for users to view product reviews.
What is “premium content”? My understanding is that the evaluation content is topical, highly popular and rich in content, among which “total number of likes” is one of the important indicators to measure the high popularity. Right now, we calculate quality content and provide a query interface based solely on the number of likes. It is likely that this design will continue when other indicators are introduced in the future.
The ratings table has a plurality field, which can be sorted to generate the first n items of data:
select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?
Copy the code
Note that the votes field does not update it as users like it, because frequent updates are inefficient. The votes field can be updated by means of a regular summary. The likes table holds the latest total number of likes for the comments, so you can update the votes by filtering the last likes for the corresponding content every day or hour.
Regardless of what database or table the underlying data is in, I refer to this step as “back source,” which is a concept of behavior when the cache misses.
When loading recommendation evaluation, the back source algorithm is
public List<Remark> listRecommendRemarks(/*not null*/ String itemId, int expectCount){
if (expectCount <= 0)
return new ArrayList<>();
Assert.isTrue(expectCount <= MAX_LIST_SIZE, "Do not allow multiple queries at one time");
String sql = "select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?";
return db.query(sql, (resultSet, i) -> {
Remark remark = new Remark();
remark.setId(resultSet.getLong(1));
remark.setConsumerId(resultSet.getLong(2));
remark.setOrderId(resultSet.getString(3));
remark.setItemId(itemId);
remark.setScore(resultSet.getShort(4));
remark.setHeader(resultSet.getString(5));
remark.setContent(resultSet.getString(6));
remark.setImages(resultSet.getString(7));
remark.setUsername(resultSet.getString(8));
remark.setUserface(resultSet.getString(9));
remark.setCreateTime(resultSet.getString(10));
return remark;
}, itemId, expectCount);
}
Copy the code
All you need to do is save this part to the cache and then print it out.
Replace lists atomically
The recommendation evaluation is a LIST. I choose to use the LIST data type of Redis to facilitate the range query. Please refer to the evaluation LIST in the previous article.
However, Redis does not directly provide the operation to replace the list, only a combination of DEL, LRPUSH, RENAME and other commands can be implemented, but the client side of the combined operation is non-atomic, needless to say, the use of scripts:
Delete and create a list
--params 1 2
--KEYS List key name Proxy key
- the ARGV list
redis.call('DEL', KEYS[1])
for i= 1, #ARGV do
redis.call('RPUSH', KEYS[1], ARGV[i])
end
-- Extend the expiration time of proxy locks
redis.call('SET', KEYS[2].1)
redis.call('EXPIRE',KEYS[2].3600)
Copy the code
The main code of query recommendation evaluation is as follows:
@Cacheable(value = "recommend")
public JSONArray listRecommendRemarks(/*not null*/ String itemId, int start, int stop) {
try {
if (remarkRedis.shouldUpdateRecommend(itemId)) {
// The lock is successfully locked. The evaluation content in the database needs to be loaded into Redis
remarkQueue.push(itemId, () -> reloadRecommendRemarks(itemId));
}
return appendVoteInfo(remarkRedis.readRecommendRange(itemId, start, stop));
} catch (Exception e) {
log.error("function RemarkService.listRecommendRemarks make exception:{} by:{},{},{}", e.getMessage(), itemId, start, stop);
returnSystemConstants.EMPTY_ARRAY; }}Copy the code
Among them, the proxy key mode is still used, so that Redis stores the list of major business data never expires, avoiding cache breakdown and frequent distributed blocking locking.
Some important Redis action codes:
// Save the recommendations and reset the expiration time
public void saveRecommendData(String itemId, /*not null*/ List<Remark> list) {
String[] argv = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
argv[i] = JSONObject.fromObject(list.get(i)).toString();
}
redisTemplate.execute(resetListScript,
List.of(RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId,
RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId), argv);
}
// Read the recommended content
public JSONArray readRecommendRange(String itemId, int start, int stop) {
String key = RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId;
return range(start, stop, key);
}
// Whether the recommendation should be updated
public boolean shouldUpdateRecommend(String itemId) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId);
return flag == null| |! flag; }Copy the code
Cold start with empty data
A cold start is when the service goes live for the first time or when Redis restarts with zero cache, and any cache that was loaded has not been loaded, or has been loaded before, and is now gone due to an accident. At this point, the proxy lock will expire, SETNX will succeed, and the thread that successfully locked will synchronize the database data to Redis, so that the business data KEY is no longer empty. If the synchronization fails, the lock expires after 2 seconds and a new thread takes over. If the business data is loaded, the proxy lock is then delayed for one hour so that synchronization is triggered one hour later. The entire process is asynchronous, and the thread requested by the user only reads the business data KEY. That is, the interface is returned empty only for a few seconds during a cold start, which is acceptable because cold starts occur only at very specific points in time when new business comes online or when Redis memory cannot be restored.
Empty data is when the contents of the database are empty in the first place. Based on the above design thinking, it can be concluded that if the database contents are empty, then the business data KEY is empty, i.e. Nil, and no placeholders are stored because the proxy KEY already acts as a placeholder. At this point, a simple proxy KEY can prevent cache breakdowns, prevent synchronization blocks, placeholders, and so on.
subsequent
There may be some updates to the implementation of sweepstakes and seckill events.