The authors introduce
Xiansheng Jing joined Qunar in 2019 and is now responsible for the core business of domestic ticket issue, refund and change. Extensive experience in domain driven design (DDD), high concurrency.
One, foreword
Qunar’s domestic ticket after-sales service provides customers with services such as ticket refund, reservation change, flight change, itinerary service and epidemic policy. In business scenarios, there are complex query demands based on various dimensions such as order, ticket, PNR, itinerary, and airline. These data are stored in different mysql database tables, which makes it difficult to provide services for complex business query scenarios. In order to solve the complex query scenario, we designed a set of data aggregation from one data source to another data source, providing isomorphic or heterogeneous, low latency, the final consistency of the data synchronization system.
The Qunar data synchronization platform synchronizes data from mysql to ES and provides unified query gateway services. With the development of services, the high availability of the active and standby CLUSTERS of ES is problematic. After a single node fails, it is difficult to recover and complement. The synchronization link is unreasonable, and there is mutual influence between indexes. The company requires to upgrade the ES cluster version. This article will discuss with you in detail the multi-version migration support of data synchronization platform ES, the design and practice of data synchronization platform high availability and data consistency, hoping to be helpful or inspiring to you.
2. Introduction of data synchronization platform
Early on qunar launched Databus based data synchronization system 1.0. It solves the problem that multiple tables are aggregated and written to heterogeneous data storage systems (such as ES, Hive, etc.). In order to solve the databus producer single point problem, canal and data configuration operation and maintenance difficulties, and query client management chaos, the data synchronization platform has done an overall architecture upgrade. For specific practice, you can refer to Qunar Technology Salon’s previous technology blog “Data Synchronization Platform Reconfiguration Practice”. Qunar data synchronization platform is mainly composed of data synchronization module, data Center (CRAB) and management module. The overall architecture is shown in the figure below.
2.1. Introduction to data synchronization platform
Data synchronization module includes databus, Canal, Inception Gate and other implementation solutions. Considering the ease of synchronization configuration, transactional and high availability, we chose Alibaba otTER to subscribe to mysql Binlog. Message queue uses Kafka, data mapping filter rules independently designed system DTS.
- Otter: Based on the transformation of Alibaba’s open source distributed database synchronization system, it mainly solves the problems of page synchronization configuration and single point of Databus producer. At the same time, we extended the realization of Kafka type of datasourdia, sending messages to Kafka in the OTter Load phase.
- DTS: The design idea is based on otter mode, which mainly includes Node, complement task and database backcheck components. A Node is composed of four phases of SFTL. S is consuming data in Kafka, F is filtering data, T is reassembling data (master/child table association, Array processing), and L is writing data to the ES gateway via Crab-client.
2.2. Data center
Data Center (CRAB) provides synchronous data writing service internally and unified query service externally. Hystrix fuses, degrades, and isolates based on the AppCode + ES index. The query traffic scheduling of the ES cluster is configured. We also provide crab-Client in the form of a Java API that blocks the complexity of DSL details, reducing the cost of line of business access.
- Crab: data center. It provides ES read and write services and provides unified authentication, fuse limiting, and traffic allocation functions.
- Es group: An INDEX exists in multiple ES clusters and is logically divided into groups to provide high availability services in active/standby mode for read/write applications.
- Service access: Service access parties query es through crab-client access. Crab -client simplifies query complexity and facilitates service access.
2.3. Management Platform
The management platform is optional for the entire data synchronization platform. It is required only for the first startup. After a task is started, normal functions are not affected no matter whether the management platform is available. The management platform maintains data synchronization configuration, DTS node online and offline, crab authentication, traffic limiting, and ES cluster read/write traffic allocation.
- Mysql MMM PXC: Three internal mysql architectures (single-node mysql, MMM and PXC). Otter built-in Canal subscribes to binlog, providing data synchronization metadata.
- Manager: Manages the configuration of working nodes (OTter, DTS, and CRAB), including read and write authentication, ES cluster grouping, ES index maintenance, ES query traffic allocation in active and standby clusters, DTS data aggregation node management, and other management functions.
- Watcher: the company’s internal monitoring and alarm platform, providing whole-link monitoring and alarm for the system.
3. Background of technological evolution
The data synchronization platform is currently connected to 10+ service lines and 14+ ES indexes. The 2021 Q2 data group requires the unified upgrade of ES5.x to ES7.x. Meanwhile, the problems of high availability and stability of the data synchronization platform are prominent, mainly manifested in the following four aspects:
- When either of the ES clusters is faulty, the entire synchronous link becomes unavailable and the fault is difficult to be rectified.
- After the database IP address is migrated, the OTter cannot switch over the database automatically.
- The full link monitoring and degraded circuit breaker are not clear, which makes it difficult to find and rectify faults.
- The synchronization link is improperly isolated or degraded. The synchronization failure of one index affects the synchronization of other indexes. \
Based on the above pain points of the system, the following two goals are formulated:
- The data synchronization platform provides flexible capacity reduction and expansion. The ES cluster can be smoothly scaled down, expanded, and migrated, and ES5.x can be upgraded to ES7.x. This goal not only meets the requirements of the data group, but also improves the high availability of the data synchronization platform. When a single ES cluster fails, it can flexibly configure the ups and downs to recover faults.
- Synchronize the high availability optimization of the whole link. Sort out the traffic governance and circuit breaker degradation mechanisms of the whole link, establish the high availability solution of each synchronization phase, and isolate the interaction between different index synchronization phases.
Iv. Practice of technological evolution
4.1. Es cluster 7.x upgrade and hot swap
Crab is a data center that provides read and write services for THE ES cluster. The core architecture is shown in the figure below. Manager Configures es cluster grouping and query traffic control. Crab periodically pulls configuration information and provides external read and write operations. X upgrade and hot swap targets can be disaggregated as CRAB gateway supports es5.x and ES7. x parallel, ES cluster hot swap, and ES cluster flexible complement.
4.1.1. Es5.x and ES7.x are parallel
The versions of ES5. x and ES7. x are different, including es REST client compatibility, REST APIs compatibility, and es query return values.
- Elastic rest client
The Elastic Java Low Level REST Client is compatible with all versions of ES. The Elastic Java Low level REST Client is compatible with all versions of ES. After comprehensive comparison, we chose to use Elastic Java Low Level REST Client.
- REST APIs
The REST APIs supported by crab Gateway include the Search, Document, and Script APIs
The Search APIs support Search and Scroll query operations on ES. The main differences are shown in the table below. To implement the ES7. x function, you need to configure the version of ES in the Manager. Crab identifies the version of ES when building the request and builds a different endpoint for adaptation.
Document APIs support Index, Update, and Delete. Crab Gateway INSERT and Update operations are implemented using upsert operations.
- Query DSL and Scripting
Elasticsearch provides a full QUERY DSL (Domain Specific Language) based on JSON to define queries. Es5.x and ES7.x support different DSLS. In this scenario, we summarize the DSLS supported by crab and find that match has the following differences:
Es nested types store parent and child (1:n) tables, which need to be updated using scripts. Es7.x no longer supports specifying scripts in the form of files, which needs to be done separately when building requests.
- Es5.x and ES7.x return value difference
The main difference between es5.x and ES7.x returns is hits. Es5.x returns the number of hits directly, while ES7.x returns the number of hits as a structure. The following table. Is the solution to add parameters to the query endpoint of ES7.x by default? Track_total_hits =true, and the relation of the return value is eq. After getting the return value, extract the number of query names and change it to the es5.x structure.
4.1.2. Es Cluster Hot Swap Scheme
Es cluster hot swap is a solution to smooth online of THE ES7. X upgrade and group switchover when a single ES cluster fails. The following table shows the details of the switchover steps.
The launch of ES7. X has five core stages in terms of timing, as shown in the figure below:
-
- Reindex: The reindex is executed to write full data from the old cluster to the new cluster. In this case, the new cluster contains full data within time period 1.
-
- Crab: Perform Step 5. In this case, the newly generated data can be written to the new cluster. The data in period 2 is missing in the new cluster.
-
- Post diff complement: the full data is queried from the backcheck component, and the missing data in time period 2 and time period 3 are written to the crab gateway component.
-
- Verify data: Verify time period data of new and old clusters and sample diff data of new and old clusters.
-
- Query phase: If the DIFF of the cluster is consistent, the new cluster can be queried.
4.1.3. Es Cluster Flexible complement
In view of the requirement that the complement has full and partial complement, we developed three complement schemes, reindex, Canal moving site and diff’s complement scheduled task.
- Reindex :ES provides the _reindex API for index reconstruction. This method can be used for full data reconstruction. The specific code is as follows:
curl -H "Content-Type: application/json" -XPOST http://ip:port/_reindex -d'{
"source": {
"remote": {
"host": "http://ip:port"
},
"index": "order_info_beta_tts8"
},
"dest": {
"index": "order_info_beta_tts8"
}
}
Copy the code
The reindex complement scheme applies to the complement of 1 in the period shown in the preceding figure. You can import all the data on the index of cluster 1 to a specified new cluster.
- Canal moving site: By modifying canal’s site information, the binlog mode is pulled again for synchronization complement. Due to the problem of the retention time of binlog, the site can only resynchronize the latest data. The modification mode is shown as follows:
The Canal site scheme is applicable to the scenario where the cluster read/write QPS is not high and the time range is near. It can be used in the complement scheme of time range 2.
- Diff complement scheduled task: It rewrites ES data by backward checking components and full database data.
The Canal site scheme is applicable to the scenario where the cluster read/write QPS is not high and the time range is near. It can be used in the complement scheme of time range 2.
- Diff complement scheduled task: It rewrites ES data by backward checking components and full database data.
Diff’s complement scheme is an exact complement scheme. The business side implements the backcheck component. Data synchronization platform obtains data from the index through ES gateway after calling, and then writes inconsistent data to the corresponding index cluster through crab write component. Diff scheme can realize precise complement, but the disadvantage is that the service access party has the amount of service development. At present, online support domestic air tickets, international air tickets and other core business.
4.2 Data Consistency and High Availability
To introduce the data consistency and high availability of the data synchronization platform, firstly, take domestic air tickets as an example. The core synchronization information includes order information (1), passenger information (N), flight segment information (N), refund record (N), etc., which are synchronized to the ES index structure as shown in the figure below. Order information is created as a single document, and arrays of passenger, flight segment, and refund records are stored as internal documents nested as data type.
Data synchronization process is otter built-in canal binlog subscription after SETL four processes, written to Kafka. DTS consumes Kafka data, writes it to Crab after four SFTL processes, and finally writes it to ES cluster, as shown in the figure below.
- Guarantee of data order and final consistency:
The final data consistency is guaranteed by three levels: 1. The data of the same latitude on the link are sequential during data synchronization; 2. 2. When data fails to be written to the link, the data enters the retry queue. 3. Create diff’s complement tasks for important clusters. 1, according to the synchronization sequence of single dimension data (otter → kafka → DTS → crab → es)
When writing data to a partition in kafka in the otter Load phase, ensure that the data and services are allocated to the same partition. For example, the partition key for a domestic flight service to send kafka is the database name + order_ID. This ensures that all binlogs under an order are sent to the same partition.
DTS consumes Kafka in a single thread and processes data sequentially. When written to the Crab gateway through the Dubbo interface, it is also written sequentially to es.
2. When data fails to be written to the link, it enters the failure retry queue
The system catches the exception and writes the data to the retry Kafka queue. After the DTS consumes the retry data, the DTS checks the data back through the database. Query the latest data and write it to the Crab gateway to ensure the data consistency.
3. Set up diff complement tasks for important clusters
For important indexes such as domestic and international air tickets, diff complement logic of the latest one-minute data is established. The scheduled task queries the full one-minute data through the backcheck component, compares it with ES data diff, and writes inconsistent data into Crab gateway to ensure the final consistency of data.
- High availability of data synchronization links:
The high availability of data synchronization links (otter → kafka → DTS → crab → ES) is as follows:
Otter: An OTter runs on pipelines between otter nodes to isolate indexes. Meanwhile, the built-in canal operates in active/standby mode, and the S and L phases operate in active/standby mode in multiple nodes, realizing the high availability of a single index.
Kafka: High availability is achieved with one topic per index, multiple partitions per topic, and multiple copies of each partition.
DTS: Generates multiple nodes’ consumption data in Kafka’s topic dimension, and implements index isolation in the way of thread isolation. Multiple consumers enable high availability of a single index, allowing other consumers to continue consuming data after one consumer destroys it.
Crab: Creates a Hystrix thread pool with the caller AppCode + index dimension, not only to isolate inter-index impacts but also to ensure single-index carrying capacity.
Es: An index is stored in multiple ES clusters. Query distribution is configured by the management system to achieve high availability of the index.
The data synchronization platform has experienced some abnormal cases, such as the Kafka disk space is used up or the ZK is faulty. In this case, you can deploy and recover the cluster by using the flexible complement method described in 4.1.3 ES cluster.
- Optimization of data synchronization:
1. Take the international flight ticket as an example, one operation will generate multiple binlogs of the master and child tables, and multiple binlogs will be written to the same partition. DTS can use the service number as the key according to the business characteristics when consuming. In a batch, only the last binlog is retained to perform backcheck and write. That is, in a batch of binlogs with the same service number, no matter how many binlogs there are, only one last backcheck and write is performed.
2. Otter database active/standby switchover is not open source. According to the PXC architecture of the company’s mainstream mysql, we customized and implemented the active/standby switchover function of the database.
In OTter S phase, 10000 pieces of data are pulled by default. In DDL operation of large table, the network card will be full and cause failure. We dynamically adjust the pull setting according to the change of the number of pull pieces.
V. Summary and future planning
Through the above scheme, the expected goal is successfully achieved, and the full-link monitoring and full-link high availability of the data synchronization platform are realized. In addition, the read/write performance of the ES cluster has changed in magnitude. The 7.x query time of domestic ticket ES5. x migration has decreased from 68ms to 21ms (figure 1), and the write time has decreased from 34ms to 6ms (figure 2) :
After the system was put online, several online problems occurred during daily maintenance, and the high availability of the system was tested:
1. The physical machine in the old ES cluster of the work order is faulty
On June 4, the system reported an alarm indicating that the number of work order index exceptions written by the platform in the data increased. After querying the exception log, it was found that the ES IP connection failed. After consulting the NOC, it was found that the machine oversecurity broke down.
2. The kafka disk is full, causing synchronization failure
The system gave an alarm and found that the single index synchronization was abnormal. After investigation, it was found that the disk of a certain machine in Kafka was full, which led to the synchronization exception. After the disk problem was solved, the data was compensated and restored in the form of canal moving points.
The data synchronization platform implements the high availability solution in the synchronization link. However, in some scenarios, exceptions need to be manually detected. With the access of more lines of business, higher requirements are put forward for our availability, ease of use, scalability and low-cost access. Future planning focuses on DTS data aggregation configuration and automatic fault migration.