Author: Re

Prospects for review

I have written an article about the repeated change data filtering processing scheme under the same table when Canal listens to Mysql data changes, which is used to solve the problem of excessive CPU waste in ES cluster caused by repeated consumption of the same ID within the instantaneous interval.

For details, see Canal Synchronous ES Instantaneous Duplicate Data Filtering scheme juejin.cn/post/700283…

Thinking about?

Based on the solution of repeated data filtering, careful students will find that there are atomicity problems in the data concurrent scenario. According to the timing diagram of the pseudocode, it can be found as follows:

“Program Execution Sequence Diagram”

As shown in the figure, the program is divided into Dump service and scheduled task service. The Dump service triggers the execution of the task after every database data change, and the scheduled task is triggered to execute the task every second (in the cluster, there is only one thread).

1) Scheduled task: obtain the data to be processed in the current 1s from [Collect container], including the data whose ID is 1.

2) Scheduled tasks, and put the data to be processed into [collection container].

3) At this time, Dump service receives notification of database change, which happens to be the data with “ID = 1” that needs to be changed, and it needs to be put into [collection container].

4) Scheduled tasks, the data to be processed has been obtained, so the data can be removed from the [collection container]. At this time, due to the concurrency sequence problem, the data that T2 needs to store with “ID =1” was also deleted by the program of _T1, causing the problem of data loss.

Occurrence probability analysis

According to the definition described above, the probability of occurrence of the problem is as follows:

1) The above problem only occurs when data with the same ID occurs in two different transactions within the 1s instantaneous period (see the previous article for a review of the definition of transactions here).

2) In addition, the data processing after the executing thread gets the ID is performed asynchronously, which slows down the timeliness of data dump from the side and thus reduces the reading probability of inconsistent final data caused by two instantaneous change transactions.

Therefore, the transaction of documents is usually guaranteed by the transaction mechanism, and it is not possible for the same document to be changed for 1s at the front and the back. This problem is usually caused under extreme circumstances.

In fact, when we deal with duplicate data filtering, we do not deliberately solve this problem, because this phenomenon can be avoided in business scenarios. Secondly, in the overall data synchronization scheme, we also use some bottom-saving policies to ensure the final consistency of data. Here we can explore this problem as a general solution.

The problem analysis

To redefine the

1) Dump service data processing thread, will produce the data to be processed, we can call this process “producer” produces “production data” in the “production process”;

2) The scheduled task service will process “production data” every 1s. We can call this process as “consumer” processing “consumption data” in “consumption process”;

3) “production data” will be put into “collection container”, “consumer” will take out “data from” collection container “and put into” execution container “. Finally, the asynchronous execution task is run to process the data of the execution container.

Analysis step by step

1) “producer”, there is only one thread in the cluster processing data, because canal synchronizes with binlog data change notifications, this solution is done by sequential messages. (MQ’s sequential message principle, see: juejin.cn/post/699981…)

2)”consumers“, in order to achieve the 1s data acquisition, in this case also through a thread. The subsequent data processing is an asynchronous multithreading scheme. (Depending on the timing task execution Settings, there may be multi-thread concurrency, which is represented by a single thread, and the next task needs to be executed after the completion of the previous task.) From the analysis, it can be known that this operation is actually competed by two threads concurrently for Redis resources, and the emphasis of competition here is not too high. As shown in figure:

“Consumer and producer threads compete for Redis resources”

Implied question

From the point of analysis, we are to solve in concurrent collection vessel 】 【 the resource competition question, but the actual situation is not a single point of collecting container 】 【 resources problem, the whole link, we see a panoramic view, and the key steps tag number, the T1, T2 and T3 respectively the three different asynchronous tasks,

  • (1) T1: Data change notification of Canal, write the data with “ID =1” into [collection container] in real time
  • (2) T2: scheduled task, which is divided into four steps for data processing every 1s.
    • (3) The first step is to obtain the data of [collection container] (including “id=1”);
    • (4) Step 2: Store the obtained data (including “id=1”) in [execution container];
    • (5) Step 3: Delete the data obtained from [collection container] (including “id=1”);
    • (6) The fourth step, asynchronously perform the data processing work of [execute container];
  • T3: 【Perform the containerExecute thread asynchronously.
    • (7) The first step is to obtain the data of [execution container];
    • The second step is to process the obtained data in Dump operation.
    • (8) Step 3, after confirming the success of dump, delete the obtained data from [execute container];

According to the panorama link, if the data with “ID =1” is deleted after adding due to resource competition of [collection container], the problem will occur after the following steps:

  • (3) Execute first and put the obtained data into the [execution container].
  • (7) Perform asynchronous task scheduling, directly obtain the data just put in, and start processing data.
  • (1) Canal data change notification, and write “id=1” data into [collection container].
  • (5) After the scheduled task obtains the data to be processed within 1s, the [collection container] is cleared.

Data loss occurs in the process of (3) – (7) – (1) – (5), and occurs only when the scheduled task is executed for the > second time. Because the scheduled task is executed for the first time, it must be executed asynchronously after ** [execution container]. If it is (3) – (1) – (5) – (7), even if the data with “id=1” is lost due to resource competition of [collection container], it will still be executed in the execution container, so it is concurrency safe. So to ensure concurrency safety in this scenario, there are two ways to think about it:

  1. Either ensure that the order of execution is (5) – (1), which is the root cause of the problem mentioned earlier. Do not let the data in the collection container be lost.
  2. Either ensure that the order of execution is (5) – (7) to ensure that even if [collect container] data is lost, [execute container] will eventually process the lost data. (In fact, this can also happen to the [execute container] data loss problem)

The solution

Problem description

From the analysis results of the problem, we redefine the description of the problem. The conditions are as follows:

  • A single-threaded “producer” that generates data and puts it into a collection container (Redis resource).
  • Single thread“Consumer”, _ after obtaining _ consumption data,deleteCollect the consumption data in the container.
  • The consumption process is once every 1s.
  • “Production Process” refers to real-time production. (i.e. production frequency > consumption frequency, and _ non-production data volume > consumption data volume _)

Problem to be solved ** : How to guarantee the order of (5) – (1) or ** (5) – (7) **?

Implied conditions:

  • Steps (1) and (5) are possible to be performed on different machines (different threads), i.e. the problem of [collection containers] competing for resources on different machines.
  • The steps (5) and (7) must be executed on the same machine by different threads.
  • (4) and (8) will cause [execute container] on the same machine, resource competition problem. That is, the [execute container] data loss problem.

Problem change

  • Competing for resources on different machines versus competing for resources on the same machine? (The same machine resource competition cost is low). Assume that the execution sequence of (5) – (7) is guaranteed and resources of (4) and (8) are not competitive, that is, the serialization of T2 and T3 threads (considering the need for parallel task processing, serial solution is not feasible).
  • Or introduce a version number to solve these problems, ensuring that T3 thread processes the data of the current version of T2 and does not process the data of other versions of T2, and ensuring the ability of T3 thread to dump data in parallel.

Pseudo code

During the execution of a scheduled task, the system obtains the timestamp of the current task as the version number of the task, marks the obtained data to be executed with the version number, and passes in the version number when invoking asynchronous task execution. “Pseudo-code for scheduled task execution every 1 second”

When the asynchronous execution task is running, the data of the specified version is obtained according to the version number passed in for processing, so as to isolate competing data with the same ID between different versions. “Pseudocode for asynchronously executing tasks”

Finally, think about the problem

  • Can the above solution be satisfied in the scenario of multi-threaded consumers?
  • What to do when the amount of instantaneous data is too large to be processed by [execute container] or [collect container] in a short time?
  • Can the solution be changed to the processing mode of consuming data version number in [collection container], can the problem of lost data be solved?
  • If [execute container] fails, how does the data in the task version compensate for processing?

To language finches browsing better text style: www.yuque.com/docs/share/… Solving the Problem of Concurrent Data Loss in the Context of Canal Data Filtering