In the project development, there is often such a scenario, the application to add, modify, delete the database directly connect to the master library, the application only to query the database, first set up a central cache, such as Redis or memcache, if the cache is not hit, then directly access the slave library. The following sections describe how to handle master-slave delays if they occur when flushing the central cache. That is, when system-A writes data to the master database, system-B reads data from the slave database, but the master database has not been synchronized to the slave database. If the cache is refreshed at this time, the old data will be flushed to the cache.
Remark:
When I worked on this issue, the mechanism for flushing the central cache was notified using MQ messages. This article is also based on MQ technology background, came up with some solutions.Copy the code
The local caching framework caches data
We can determine whether the master database has been synchronized to the slave database based on the update_time of the data. If there is an update to the database and update_time indicates that the latest master database data has not been synchronized to the slave database, then we can store the primary key of this data in the local cache. For example, we can use LinkedBlockingQueue as the local cache. The primary key ID of the data is stored in the queue. Then a job is started to scan the queue and process the data once it finds any data in the queue. If the data is not synchronized from the master, the primary key of the data is queued until the data is synchronized from the master. If the data is always in the queue/out loop due to database problems, data problems, code problems, etc., then we can set a number of times for the data in and out of the queue, such as 5 times, more than 5 times, the data will be lost.
Some pseudocode is listed below:
Queue implementation
public class DelayQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueue.class);
private static final int QUEUE_MAX_ELEMENT_COUNT = 20000;
private LinkedBlockingQueue<MessageElement> queue = new LinkedBlockingQueue<MessageElement>(QUEUE_MAX_ELEMENT_COUNT);
private static class SingletonHolder {
private static final DelayQueue INSTANCE = new DelayQueue ();
}
private DelayQueue (){}
public static final DelayQueue getInstance() {
returnSingletonHolder.INSTANCE; Public void offer(MessageElement MessageElement){Boolean result = queue.offer(messageElement); // The queue is fullif(! result) { LOGGER.warn(dataBase masterSlaveDataDelayQueue full); }} /** * public MessageElementpoll() {returnqueue.poll(); }}Copy the code
Scanning for jobs in the local cache queue
Use Spring’s timed task annotations:
** ** Scheduled(cron= Scheduled); /** * Scheduled(cron= Scheduled); /** * Scheduled(cron= Scheduled); /** * Scheduled(cron= Scheduled); /** * Scheduled(cron= Scheduled)"0/5 * * *?")
public void handleQueueMessage() {while(true){
String result = "true"; // It is best to read from the configuration file when the value isfalseIs not receivedif (Constants.FALSE.equals(result)) {
return;
}
MessageElement messageElement = DelayQueue .getInstance().poll();
if (messageElement == null) {
break;
}
LOGGER.info("receiveMessage from delay queue"+messageElement.toString()); salesService.handleMessage(messageElement); }}Copy the code
The message body
public class MessageElement { private Long id; Private AtomicInteger count = new AtomicInteger(); Public long Specifies the maximum number of queue entriesgetId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) { this.count = count; }}Copy the code
Remark:
-
Set the maximum capacity of the queue. If the number of data in the queue exceeds the maximum capacity, you can delete the queue header or stop adding data to the queue based on your service requirements.
-
In this solution, the local cache will be cleaned when the data is restarted, resulting in data loss.
-
There must be a switch that controls whether messages are received or not. Because if the number of concurrent messages sent by the producer becomes too large, other problems can be caused, the switch can be used to disable the reception of messages in order to degrade them. After all, we’re just flushing the cache.
Using the MQ
You can also try using MQ if it has the following features:
If the data is not synchronized from the master, you can set the status of the message to later, and let the sender send the message again at intervals, such as 2s and 5s, 1 minute later. The message continues to be sent until an hour later.Copy the code
In this way, the application does not need to use the local cache and uses MQ directly. Messages will not be lost when the application restarts.
The original link
Real-time cache refresh – Some design options for handling mysql master-slave latency