background

The phenomenon of

  • Elasticsearch-based services provide the full-text search capability for products. You need To heterogeneous DB change data To ES in a timely manner for services To query, that is, Database To ES (DTS).
  • Because there are many data change entries, Canal native client is used to monitor binlog changes of data, and the data is delivered to MQ in sequence. The basic search service consumer consumes and filters the data, and after data assembly, it is reported to ES, namely DB->Canal->MQ->Comsumer->ES.
  • At present, the overall DB->ES data synchronization is slow, and the processing speed QPS is about 60. When a large number of data changes, such as data washing and resource import, are encountered, a large number of Ccanal MQ (sequential) message accumulation and service data synchronization delay are often caused.
  • For some scenarios sensitive to data visibility (such as the need to be visible immediately after personal exercises are created), because the DB of multi-condition query is slow and the ES of the current implementation scheme is used, large delay will cause great service damage to such scenarios, which is unacceptable.
  • At present, the above impacts can be reduced by avoiding data cleaning at peak times, which causes inconvenience to daily data washing and implies poor user experience of online services.

Live data volume – Synchronization delay comparison

Data variation DTS data synchronization
60 < x < 600 1s~10s
3600<x<10800 1min~3min
10800<x >3min

Problem analysis

Current implementation scheme

Time-consuming analysis

1.DB→ES reporting is mainly decoupled through MQ peak clipping, and MQ message accumulation is easily caused when DB data is changed in large quantities; There are no more than three cases:

  • Producer production news too fast:
  • Slow news on consumer spending;
  • Both phenomena exist simultaneously;

2.DB->ES→ can be retrieved, and there are mainly several synchronization time points :(refer to the figure above, red fonts are critical time points)

  • Step 1: The delay caused by sending binlog logs to Canal (refer to the delay caused by MySQL master-slave synchronization) can be ignored in normal cases. (above)
  • Step 2: Canal sends a change order message to MQ:
    • Messages are currently configured globally in order; (Canal supports table level configuration). When a large number of data changes are made, the number exceeds 1 million and there is a delay of about 20 minutes;
    • Send all change data to only one queue of brokers (It is not conducive to sequential consumption at the consumer end);
    • Canal does not support sequential sending by primary key ID.
    • Reporting QPS is relatively acceptable, there is no business logic, and it is directly forwarded to MQ.
  1. Step 3: THE DTS service pulls messages in batches and consumes them in sequence:
  • Pull messages sequentially from the MQ queue,Due to the global sequential message in Step 2, the sequential consumption on the consumer end can only be locked and pulled from a queue, which is processed by a single thread, and the consumption speed cannot be improved by extending the instance;
  • After receiving the message, step 5 Perform different ES actions according to the table and DDL operation types.
  • After the report is successful, the MQ message ACK is performed and CONSUME_SUCCESS is returned to confirm the Canal consumption site.
  • If the report fails, rollback the MQ message, return RECONSUME_LATER, and retry.
  1. Step 4 and 5:
  • Synchronization, sequential reporting logic, also mixed with business data query and filtering;
  • ES Bulk operation, one HTTP request.
  • ES writes the data into the memory buffer, and the data is visible after Refresh. Within 1s, due to the ES feature, processing cannot be performed temporarily.

At present, canal-MQ message accumulation is mainly caused by Step 3. This optimization starts from this point first.

MQ sequential consumption optimization

  • Currently Canal sends messages in global order (the stack bottleneck is still on the consumer side) :

  • Even with the single Comsumer multi-worker thread model,There is still only one reporting thread working at a time, which is very bad for IO intensive tasks;
  • Symptom: Data reporting and consumption are all crowded in a queue, which is not good for storage and multi-instance consumption.

disadvantages

  • In order to maintain the strong consistency of heterogeneous data from DB to ES, the binlog is processed in the order generated and then re-placed to ES, and synchronous message confirmation and rollback are performed to ensure strong consistency. At present, the performance of single-thread processing is extremely low.
  • In actual services, the probability of concurrent operations on the same data is extremely low. Therefore, the granularity of sequential synchronization data needs to be further refined.

The project design

The target

  • Expect to ensure that messages are reported sequentially according to the minimum granularity (table + service UID) and improve the reported QPS from DB to ES.

Queue scheme

Guaranteed by the FIFO feature of the same queue;

1. Directly connect to the Canal server

  1. The search service is directly connected to the Canal service end, and the change message is pulled and reported in the sequence of table + service ID:
  2. In other words, the data reporting method is optimized, and the consumption speed at the consumer end is naturally improved;

Service architecture

advantages

  • You can customize the order of sending MQ routes (by table + ID). (Currently not supported by Canal native client)
  • Data reporting Supports multi-instance consumption reporting to increase consumption speed by expanding service instances.

disadvantages

  • Replace the official Canal sending MQ client, which has a certain amount of development;

2. The consumer re-routes the message to MQ by table + service ID.

advantages

  • Advantages as above;
  • And the implementation is simple, only need to increase 3 steps, the original code invasion is small;

disadvantages

  • After two MQ runs, network consumption and resource storage waste exist.

Distributed locking scheme

The Redis lock ensures the same sequence of service UID executions

  • In essence, sequential consumption only needs to ensure the sequence of the same business data through distributed lock synchronization.
  • That is, the time point of confirmation of sequential consumption is shifted from successful consumption reporting to successful Redis lock acquisition. However, the concurrent/high frequency (within 1s) processing of the same data in the business layer is very small, that is, the probability of waiting for lock acquisition is actually very low.

Service architecture

advantages

  • It can realize the expected sequential synchronization by ID;
  • Optimization by increasing the consumption speed of single instances;

disadvantages

  • The overall change is large, pull data mode (ACK mode), and roll back the minimum batchId logic;
  • With the introduction of Redis component, the overall synchronization link becomes longer, the reliability becomes worse and the complexity becomes higher.
  • Queues have been introduced to ensure sequential processing and locking is not recommended.

Causal matrix analysis

plan Development Cost 4 Maintenance Cost 4 Synchronization rate 8 Data consistency assurance Total score
Plan 1 3 (replace canal native sending MQ client) 4 (No subsequent changes) 7(Custom route, hash to queue evenly in sequence) 8 (Guaranteed by the original ACK method of consumption point) 22
Scheme 2 4 (Only an additional MQ route is required for secondary routing) 2 (one more MQ store and send) 5 (Secondary MQ sending) 8 (Same as original) 19
Plan 3 2 (Large change scope) 2 (Introducing Redis components) 6 (The original sequential consumption of a single queue is not improved by increasing the consumption speed of a single instance) 6 (Exception processing is complicated) 16

Further optimization

Optimization was carried out on the basis of scheme 1

Problem analysis

  • When a large amount of data is updated, in fact, a lot of data is public resource data, and timely update is not important for business.
  • Private data (public/area) needs to be aware of changes in time because they are perceived by individual operations (create/query/delete/update, etc.) and these changes are blocked by public data cleansing, which is unreasonable;
  • The MQ queues are divided into busy queues and idle queues. For changing the data with a higher priority, the latter is used to reduce the delay caused by queue blocking.

implementation

  • Appcode is used to distinguish synchronization data with a higher priority. (You can also specify parameters or policies based on service scenarios.)
  • Post data to different queues;

Code implementation

Effect :(synchronization of different queue priorities)

Synchronization bottlenecks

  • Because canal server allows only one client to be connected at the same time, it can be expanded through the consumer to meet the actual use of production.
  • Further optimization direction thinking: optimize Canal client end to MQ IO, through the service memory queue queuing directly do ACK, service thread pool hook registration to ensure maximum efforts to deliver messages to MQ in case of exceptions;

conclusion

  • Scheme 1,2 mainly optimizes the message production end (and the consumption mode of the consumer end is optimized), that is, the message is sent to the same queue through smaller granularity routing (global order → service uid order). The consumer end can improve the consumption speed through capacity expansion, and synchronize the priority of information according to business scenarios.
  • Scheme 3 is to ensure sequential consumption of the same UID by locking, that is, data of the same UID is guaranteed to be sequential, which is complicated to implement and improves the consumption speed under a single instance. It is not recommended to introduce it under the condition that queue order is guaranteed.

In summary, it is suggested to choose plan 1.