Author: Re
background
In the technical implementation of business data access ES, we will encounter the problem of real-time variable data synchronization. Currently, the unified solution within the company is to synchronize binlog information to the application service and dump variable data to ES via Canal + MQ.
See flow chart for detailsThat is as follows:
- When Mysql data changes, a binlog is recorded.
- Canal, disguised as a slave of mysql, listens for change data and sends it to MQ (sequential messages).
- A dump service that listens on the topic and pulls (based on long polling) consumption data.
- After data processing, the latest document data will be synchronized to ES for easy search and use.
In order to ensure that the data entry in ES is up to date,The data processingThe procedure retrieves the data ID (primary key) synchronized by CANAL from the databaseThe latest dataAnd synchronizes the data to the index of ES.(Index structure, a common strategy is to tile multiple table data into a single Index)
The problem
- Canal may listen for multiple changes to a unified bill (that is, the same ID on the main table of the bill) during a transient period.
- When a document is created, it is likely to generate multiple data (primary table data + related additional table data). At this time, if Canal listens for data changes of multiple tables, it will try to repeat the dump scenario, that is, create a document and generate multiple dump commands.
These two scenarios, the same document data will be dumped to ES for several times in a certain period of time, which unconsciously generates unnecessary request pressure on database query and ES access. The link flow is as follows: [1] T1 time: data whose ID is 1 in main_tableThe field A changes, which is listened on by Canal and finally notified to DumpService; [2] T1 time: main_table tableData with id = 1The related table ext_table changes B field, which is monitored by Canal and finally notified to DumpService; [3] DumpService receives two queries for the same period (microsecond level)Data with id = 1Changes are notified and passedData with id = 1Query mysql to obtain complete documents, and then synchronize to ES; Due to the above problems, the actual program only needs to complete the final document data synchronization, without redundant data synchronization operations. But notification of data changes is generated by sequential messages,How to judge the related documentsChange notificationFrom the same transaction **? If we can confirm that the change notification message came from the same transaction, we can know how many times we have actually dumped (because business processing is that a transaction is the processing of a complete document).As shown, transactions A and B both proceedId = 1Because the sequential messages cannot be distinguished by DumpServiceWhether from the same transaction, that is, data toSynchronize once or multiple times? (In theory it only needs to be synchronized twice, but in practice it’s synchronized three times)
solution
- Can I intercept the data within a certain period of time (for example, within 1 second) for duplicate filtering?
- q: a 1 second data refresh and then synchronization to ES will result in a 1 second delay in searching the data?
- A: In practice, ES will also flush disks in seconds to complete data storage, which can only be said to be ** “quasi” real-time **, so the 1-second delay generated here is basically acceptable in business.
- A: If you really care about the 1-second interval, you can allow both the first synchronization and the final synchronization. If the first change notification is received, it is dumped directly, and the second and subsequent change notifications are included in the time interval of duplicate value filtering.
- q: How to store the data within a one-second period? Do you want to think about sequential data?
- A: It can be stored in the JVM using Set. However, considering the disaster recovery situation of data recovery, it can be stored in Redis (also use Set Set). (ps: Redis is dead, data lost, not considered)
- A: There is no need to consider the problem of data sequence. The Set stores all the master ID of documents that need to be changed. Every synchronization pulls the latest database data for dump. (i.e., a final consistency treatment)
- q: How to process the data in the 1-second period?
- A: Timer can be used to execute by second. Each time, all the current data entries in the Set are obtained and then dumped. That is, the Set serves as a container for one-second data injection, which is emptied once every one second.
- Supplement: It can be considered that the amount of data generated by the Set Set in one second is too large, so it is better to Set the limit value for quantification (in fact, in terms of contract business volume, the amount of data per second is not particularly high, besides, it stores non-repeated data).
- Immediately asked: The uncollected data other than the quantitative data will not be the timeliness of the 1-second period, how to do?
- A: It can be ignored because this is the final consistency policy; If timeliness is required, refer to answer 2 to the first question and synchronize twice.
- q: a 1 second data refresh and then synchronization to ES will result in a 1 second delay in searching the data?
- q: a 1 second time window for data filtering schemes is the solution to the above problemsTransaction AB cannot be identifiedBecause this is the root cause of the problem?
- A: It doesn’t address the root cause, but it does. In essence, here we transform the problem of asynchronous message transaction relationship undiscernibility into a problem of data ultimate consistency. Although the essence is that the program cannot discern the transaction source of the message, the goal is to achieve data consistency.
The project design
- The change data monitored by Canal is put into the collection container (The set set of Redis).
- The program starts a timer, which is executed once per second, each time pulling the required execution data from the collection container.
- After the data is collected (in this case, the data that needs to be processed within 1 second), it is put into the execution container (Redis set), and the obtained data is deleted from the collection container.
- Asynchronously execute a thread that processes the data in the execution container and deletes the data from the execution container.
- q: Why are asynchronous threads used to process data?
- A: It takes a certain amount of time to process the data taken out each time. Therefore, as long as the data generated within 1 second is obtained, it can be processed by the thread alone. The thread of the current scheduled task can directly return the data.
- q: Why two containers for data?
- A: Because the whole process is in a concurrent state, if the data of the same document has different changes in one second before and after, there is only one container at this time, that is, the data change notice is deleted after the data is processed in the last second, then the data kept in ES will not be the latest. With the collection container, only the change data generated within 1 second is concerned, and the execution container only focuses on the data that needs to be dumped.
Pseudocode implementation
Cancal listens for change data and pushes it to the redis collection container:
Scheduled tasks (EJ can be used) are executed once every second. Collect the containers to be processed within the time interval and put them into the execution container for execution:
Execute the asynchronous thread that processes the data. After processing the data, execute the container to clear the processed data:
Out of the box
CanalPreventDuplicateDumpJob. Java connotation: addDumpKey method, will change data id push to redis collection containers.
CanalPreventDuplicateDumpJobImpl. Java implementation of the above interface class, second, the internal use of EJ provide distributed tasks regularly, for data collection, to perform an action.
ICanalPreventDuplicateExecuteDump. Java data dump interface, after the local inheritance, realize * * void canalDump (String [] dumpKey); * * method
When used, modifymainKeyNamerightThe key of redisUnique marking, modifyserviceNamePoint to a class (named Spring) that handles data dump in your projectbeanName), and your class needs to inheritICanalPreventDuplicateExecuteDumpInterface.
Using the step
- The above three Java copy to your project (ICanalPreventDuplicateExecuteDump. Java, CanalPreventDuplicateDumpJob. Java, CanalPreventDuplicateDump JobImpl.java
- In Canal data monitoring, data changes method using CanalPreventDuplicateDumpJob# # addDumpKey method, will need to dump the data pushed to Redis collection containers. As shown below:
- Modify CanalPreventDuplicateDumpJobImpl mainKeyName, serviceName two variables.
- BeanName as serviceName variables of the class, inheritance ICanalPreventDuplicateExecuteDump interface, realize the void canalDump (String [] dumpKey); Method, as shown below: