Design idea
In order to solve the distributed database, complex SQL (such as global sorting, grouping, join, sub-query, especially the unbalanced field of these logical operations) difficult to achieve the problem; On the basis of some experience in the practical application of distributed database and Hadoop, I compared the advantages and disadvantages of both, combined with some of my own refining and thinking, and designed a comprehensive system of both, using the advantages of both, supplement the shortcomings of both. Specifically, using the idea of database horizontal partition to achieve data storage, using the idea of MapReduce to achieve SQL calculation.
The horizontal segmentation of the database here means that only the database is divided into different tables. For tables of different orders of magnitude, the number of branches can be different, for example, the data volume of 100 million is divided into 10 branches, and the data volume of 1 billion is divided into 50 branches. For the idea of using MapReduce to achieve calculation; For a requirement, convert it into one or more DEPENDENT SQL, and decompose each SQL into one or more MapReduce tasks, and each MapReduce task includes MapSQL, Shuffle, and reducesQL. This process can be understood as similar to Hive. The difference is that map and Reduce operations in MapReduce jobs are implemented using SQL instead of Map and Reduce operations in Hadoop.
This is the basic MapReduce idea, but in the Hadoop ecosystem, first-generation MapReduce stores results on disk, and second-generation MapReduce stores results in memory or disk based on memory usage, analogous to storing in a database, So MapReduce results are stored in tables, and the database caching mechanism naturally supports memory versus disk storage depending on memory availability; In addition, the calculation model is not the same in the Hadoop ecosystem. The calculation idea of MapReduce can be replaced by the RDD iterative calculation method similar to Spark. This system is based on MapReduce.
architecture
Based on the above ideas, the architecture of the system is as follows:
No proxy node
There are proxy nodes
The module specification
CNUTCon Global Operation & Maintenance Technology Conference, Sep.10-Sep.11, Shanghai Everbright Convention & Exhibition Center Hotel, watch the highlights first
As for the modules in the system, as in most distributed systems, here is a brief description:
The difference between the two architectures
When there is no proxy node, the client is responsible for a large amount of work, including sending requests, parsing SQL, generating execution plans, applying for resources, arranging execution, and obtaining results. When there is a proxy node, the proxy node is responsible for accepting requests, parsing SQL, generating execution plans, applying for resources, scheduling execution, and returning results to clients. In addition, the proxy node provides interfaces that support external protocols, such as C/S protocol of mysql. The mysql command line can be directly connected to execute SQL, the entire system is just like a normal mysql Server.
Application architecture
The actual application environment can be a formal environment, a formal backup environment, and an offline environment. The deployment can be based on the following architecture.
Basic Concepts
The following illustrates some of the concepts in architecture
Add, delete and modify operations
When data is inserted, records are inserted into the corresponding database node according to the balancing field and balancing policy.
When data is updated, determine whether the database nodes before and after data update are changed according to the balance policy. If no changes are made, the database nodes are updated directly. If there are changes, delete the old data in the database node before the update and insert the new data in the database node after the update.
When data is deleted, it is deleted from the corresponding database node according to the balancing policy.
As long as the data changes of multiple nodes are involved in these three operations, distributed transactions are required to ensure the consistency, atomicity and other transaction characteristics.
Query operation
The query operation is similar to Hive. To facilitate the explanation of the query operation, the stage tree and stage structure are explained first, as shown in the figure below:
Stage of the tree
phase
Query steps
Combined with the above figure, the specific process of the query operation is as follows:
- After lexical, grammatical and semantic analysis of the input SQL, the collection of table structure information and data distribution information can generate an execution plan containing multiple stages (stages for short). These stages have certain dependencies and form a task tree with multiple inputs and single outputs.
- Each stage consists of two TYPES of SQL, called MapSQL and ReducesQL, and each stage consists of three operations, Map, data shuffling and Reduce. Map and Reduce execute mapSQL and reducesQL respectively.
- The map operation is performed on different database nodes. The map operation is performed on MAPSQL. Its input is the data inside the table on each database node, and the output is divided according to a certain field according to certain rules and put into different result sets.
- The data shuffling process is then performed to copy the different result sets to the different database nodes that will perform Reduce.
- Perform reduce operations on different database nodes, and reduce operations perform reducesQL.
- The result is returned.
example
As the core of the system is storage and computation, the concepts related to storage and computation are illustrated below
The equilibrium strategies
Table name: tab_user_LOGIN Table description: Used to store user login information Number of nodes: 4 (0, 1, 2, and 3)
Here are some examples of strategies:
List: Take the login province as the balancing field as an example
Hash: Press 4 to hash and use the user ID as the balancing field
Range: 0 to 100 million, with the user ID as the balancing field
Combine the hash and range: range first, then mod, using the user ID as the balancing field
The query
The basic information is as follows:
User table tab_user_info is as follows:
The user login table tab_login_info has the following structure:
The sorting
The key point of sorting is that there is a size relationship between nodes. Large keys or key ranges are placed on nodes with large node IDS, and then sorted on nodes. Data is obtained according to the size of node ids.
The following SQL query is used to sort user information by age and ID within a specified registration time range:
select * from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? order by u_id
The execution plan may be:
The Map:
select * from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? order by u_id
Shuffle:
In this case, data needs to be shuffled based on the U_ID. Therefore, storage nodes need to be divided based on the U_ID. For example, if there are N compute nodes, divide the uids in the same range from the maximum U_id to the minimum u_id by /N. (The compute nodes have different sizes.)
Reduce:
select * from tab_user_info t order by u_id
Group aggregation
Key points are similar to sorting. There is a size relationship between nodes. Large keys or key ranges are placed on nodes with large node ids, and then grouped and aggregated on nodes.
The following SQL is used as an example to calculate the number of users in each age group of users in a certain registration time range:
select age,count(u_id) v from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? group by age
The execution plan may be:
The Map:
select age,count(u_id) v from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? group by age
Shuffle:
After the execution is complete, in this case, data shuffling based on age is required. Considering that the unique value of age is relatively small, data shuffling can copy all records to the same compute node.
Reduce:
select age,sum(v) from t where group by age
The connection
First, make it clear that the join field type is numeric and string, and other types such as date can be converted to both. The sorting of numeric data is very simple, and the sorting of string data needs to determine rules, similar to collation in mysql, which is commonly used according to unicode encoding order and the actual size of storage nodes. Secondly, there are equivalent join and non-equivalent join. Take the following common and simple case as an example.
Use the following SQL as an example to query all login information of users in a specified registration time range:
select t1.u_id,t1.u_name,t2.login_product from tab_user_info t1 join tab_login_info t2 on (t1.u_id=t2.u_id and t1.u_reg_dt>=? and t1.u_reg_dt<=?)
The execution plan may be:
The Map:
Because it is join, all tables are queried and each table is labeled with its own label. You can add a table name field to perform the query on all storage nodes
select u_id,u_name from tab_user_info t where u_reg_dt>=? and t1.u_reg_dt<=? select u_id, login_product from tab_login_info t
Shuffle: In this case, data needs to be shuffled based on u_id. Considering that u_id has many unique values, you need to divide storage nodes based on u_id. For example, if there are N compute nodes, divide storage nodes evenly by (maximum u_id- minimum u_id) /N. Example Divide the u_id in the same range on different storage nodes to one compute node.
Reduce:
select t1.u_id,t1.u_name,t2.login_product from tab_user_info t1 join tab_login_info t2 on (t1.u_id=t2.u_id)
The subquery
Subqueries can be decomposed into SQL with dependencies that do not contain subqueries. Therefore, the generated execution plan is that multiple SQL execution plans are executed according to certain dependencies.
The differences and advantages with existing systems
- Compared with the HDFS, data distribution is regular. After the HDFS startup, run commands to query the node where the file is located. Metadata is small, record rules can be, administration costs are low, in terms of startup speed is fast.
- The data is put in the database, can make good use of index and database itself cache mechanism, greatly improve the efficiency of data query, especially in the case of a large number of data, use index query to return a small amount of data.
- Data can be deleted and modified, which is cumbersome and inefficient in HDFS-based systems.
- In terms of computing, it is no different from MapReduce or other distributed computing frameworks such as Spark (shuffle is required). But because the distribution of data is regular, there are places where it can be better represented in a distributed full-text index.
- Online systems generally use databases as the final storage location, and it is cumbersome to synchronize the database to HDFS. In addition, data synchronization is cumbersome, inefficient and slow in the case of deletion and update. In contrast, this solution can use the mirror replication function provided by the database itself to synchronize with little additional hassle and inefficiency.
- Based on the above, online system (master system) and offline data analysis and mining (slave system) can be made into a unified scheme, see the application architecture diagram.
Application scenarios
Finally, some application scenarios are listed
The authors introduce
Jiang Hehui, currently working in Weiyou Software and once working in NetEase, focuses on data processing such as MySQL, Hadoop and distributed database
Thanks to Wood huan for reviewing this article.
To contribute or translate InfoQ Chinese, please email [email protected]. You are also welcome to follow us on Sina Weibo (@InfoQ, @Ding Xiaoyun) and wechat (wechat id: InfoQChina).