The background,
With the gradual development of hornet’s nest, we have more and more business data, and the simple use of MySQL can no longer meet our data query needs, such as multi-dimensional retrieval of goods, orders and other data.
Using Elasticsearch to store business data is a great way to solve our business search requirements. After heterogeneous storage of data, the problem of data synchronization follows.
2. Existing methods and problems
For data synchronization, our current solution is to create intermediate data tables. Put the service data to be retrieved into a MySQL table. This middle table corresponds to the Elasticsearch index required by services. Each column corresponds to a Mapping field in the index. Run the Crontab script to read all data in the MySQL intermediate table whose UTime is greater than the last read time, and write Elasticsearch.
Therefore, once the business logic has the corresponding field data change, need to take into account the MySQL intermediate table change; If you want the data in Elasticsearch to be real-time, you need to write Elasticsearch at the same time.
As the amount of service data increases, the amount of data in the MySQL intermediate tables becomes larger and larger. If you need to add a Mapping field to the index of Elasticsearch, you need to add a column to the middle table of MySQL. If you need to add a column to the index of Elasticsearch, you need to add a column to the middle table of MySQL.
In addition, as the Mapping field in Elasticsearch index increases with the development of services, businesses need to add corresponding methods to write MySQL intermediate tables, which also brings some development costs.
Iii. Program design
1. The whole idea
Some existing open source data synchronization tools, such as DataX of Ali, mainly obtain data sources based on query, which will cause problems such as how to determine the increment (such as using utime field to solve) and polling frequency. However, some of our business scenarios have high requirements for real-time data synchronization. In order to solve the above problem, we propose a new way to synchronize MySQL data to Elasticsearch based on MySQL Binlog. Binlog is the data that MySQL uses to synchronize master/slave data with Replication, so it has the data that we need to write to Elasticsearch, and it meets the requirements for data synchronization timelines.
With Binlog data synchronization for Elasticsearch, the business side can focus on the operation of the business logic to the MySQL database without worrying about the data synchronization to Elasticsearch. This reduces unnecessary synchronization code and avoids the time-consuming problem of extending the middle table columns.
After investigation, we adopted the open source project Go-mysql-ElasticSearch to realize data synchronization, and carried out some customized development for the hornet’s nest technology stack and the actual business environment.
2. Data synchronization is correct
Binlog data of all tables of the company are confidential data and cannot be obtained directly. In order to meet the use requirements of each business line, the Binlog data is provided to the user in the form of access Kafka, and the user needs to apply for the corresponding permission to use Binlog data. After obtaining the user permission, the user can read it in the form of a Consumer Group.
This approach ensures the security of Binglog data, but brings challenges to ensure the correctness of data synchronization. Therefore, we designed some mechanisms to ensure the orderly and complete acquisition of data sources.
1) order
To obtain Binlog data using Kafka, you need to ensure that the data is obtained sequentially. Strictly speaking, Kafka cannot guarantee global order, only local order, so there is no guarantee that all Binlog data will reach the Consumer in order.
But the data on each Partition is ordered. To get the Binglog of each row of MySQL records in order, Hash each Binlog to each Partition according to its Primary Key. Ensure that all Binlog data of the same MySQL record is sent to the same Partition.
In the case of multiple consumers, a Partition is allocated to only one Consumer, which ensures that data within the Partition can be updated to Elasticsearch in an orderly manner.
2). Integrity
Given all the normal and abnormal exits the synchronization program can face, and the Rebalance as the number of consumers changes, we need to make sure we don’t lose Binlog data under any circumstances.
Kafka’s Offset mechanism ensures data integrity by committing a Message’s Offset after confirming that it has been successfully written to Elasticsearch. In the case of data synchronization, repeated consumption does not affect the sequence and integrity of data.
Fourth, technical implementation
1. Function modules
Configuring the parsing module
Responsible for parsing configuration files (in TOML or JSON format), or json strings configured in the configuration center (Skipper). You can configure the Kafka cluster, Elasticsearch address, log recording mode, and Mapping between the Index and Mapping fields of the MySQL database and Elasticsearch.
Rules of the module
The rule module determines which Elasticsearch index a Binlog should be written to, the MySQL field corresponding to the document _ID, the relationship between each MySQL field in Binlog and index Mapping, and the write type, etc.
In the localization process, according to our business scenario, we added where condition judgment for each field of MySQL table to filter out unnecessary Binlog data.
Kafka-related modules
This module is responsible for connecting the Kafka cluster and obtaining Binlog data.
During localization, most of the functionality of the module has been packaged into a generic Golang Kafka Consumer Client. Includes SASL authentication required by the Dba Binlog subscription platform and consuming data from Offset at a specified point in time.
Binlog data parsing module
The Binlog data parsing in the original project is for the original Binlog data and contains the implementation of the parsing Replication protocol. In our usage scenario, the Binlog data is already a JSON string parsed by Canal, so the function of this module is simplified.
Binlog Json string example
The following is a simplified BINlog JSON string. The database and table of this binlog can match a configuration rule. Create a key-value map of Data that matches the corresponding Elasticsearch index, including some Data type conversions:
Modules related to Elasticsearch
Binlog Specifies the key-value map generated by the Binlog data parsing module as the update payload of the request _BULK interface and written to Elasticsearch. The key-value map will be temporarily stored in a slice every 200ms or when the slice length reaches a certain length (which can be adjusted by configuration). The _bulk interface of Elasticsearch is called to write data.
2. Customized development
1). Adapt to business needs
upsert
The index data used in services may come from multiple tables. If the data in a document comes from different tables, the first data is an Index, and the last data is an Update. If the sequence cannot be controlled, the upsert function needs to be implemented. Add the bulk parameter to _bulk
{
"doc_as_upsert" : true
}
Copy the code
Filter
In actual service scenarios, only part of the data in a table may be required. For example, if the data source is identified by the Type field, you only need to synchronize the data whose type is 1 or 2 to Elasticsearch. We have extended the rule configuration to support filtering requirements for binlog-specified fields like:
select * from sometable where type in(1, 2)Copy the code
2) Rapid increments
Data synchronization is generally divided into full and incremental. To access a service, you need to import historical MySQL data into Elasticsearch. This is full synchronization. Data added during full synchronization and subsequent data is incremental data.
After full data synchronization is complete, if Kafka is consumed from the oldest, it can take a long time for incremental data to catch up with the current progress if the queue is large. To get the required incremental Binlog quickly, before the Consumer Group consumes Kafka, obtain the Partition offsets for each Topic at the specified time and commit these offsets. This way, when the Consumer Group connects to the Kafka cluster, it will consume from the offset just submitted and immediately get the required incremental Binlog.
3). Microservices and configuration center
The project uses cellular microservice deployment to provide quick online support for new access services, and facilitate the rapid expansion of Consumer in case of sudden increase of Binlog data.
The wasp cell configuration center supports the configuration management of each access service. Compared with the TOML configuration file in the open source project, the configuration center can be used to manage the configuration of different services and environments more conveniently.
Five, log and monitoring
As can be seen from the figure above, the average delay of data synchronization in each table of the order is about 1s. Get your delayed data into ElastAlert, which sends out alarm notifications when too much of it is delayed.
Another monitoring indicator is heartbeat detection. A separate table independent of services is created. The crontab script modifies the table every minute and checks whether the last modification is synchronized to the specified index. The heartbeat detection, which monitors Kafka, microservices and ES across the entire process, is the first to be notified if anything goes wrong that causes data to get out of sync.
Six, the concluding
At present, the most important service provider is the order index of e-commerce, and the data synchronization delay is about 1s. This open source project localization exercise is intended to help some business scenarios with Elasticsearch data synchronization requirements.
The author of this article: Zhang Kun, senior R&D engineer of vacation business of Honeycomb E-commerce R&D team.
(Hornet’s nest technology original content, reprint please retain the source and the end of the two-dimensional code, thank you)