Bytes to beat technique salon | special big data architecture At bytedance headquarters in Shanghai. We invited Guo Jun, head of Data warehouse architecture of Bytedance, Tao Jiatao, big data R&D engineer of Kyligence, Xu Mingmin, storage engineer of Bytedance, and Bai Chen, senior technical expert of Ariyun, to share with you.
Bytedance’s Core Optimization Practices on Spark SQL by Guo Jun, head of Data Warehouse Architecture at Bytedance.
Team to introduce
summary
Introduction to Spark SQL schema
The above information is in the Catalog. In a production environment, Hive Metastore provides the Catalog service. The Analyzer converts the Unresolved Logical Plan to a Resolved Logical Plan in combination with the Catalog.
That’s not enough. Different people write different SQL, resulting in different Resolved Logical plans and different execution efficiencies. To ensure efficient execution no matter how SQL is written, the Spark SQL needs to optimize the Resolved Logical Plan by the Optimizer. The Optimizer contains a series of rules that translate the Resolved Logical Plan equivalently and ultimately generate an Optimized Logical Plan. The Optimized Logical Plan is not guaranteed to be globally optimal, but at least it is close to optimal.
Because the same logical operator can have multiple physical implementation. For example, Join has a variety of implementations, such as ShuffledHashJoin, BroadcastHashJoin, BroadcastNestedLoopJoin, SortMergeJoin and so on. Therefore, Optimized Logical Plan can be converted into multiple Physical plans by Query Planner. How to choose the optimal Physical Plan becomes a very important thing that affects the performance of the final execution. A better way is to build a Cost Model, apply this Model to all candidate Physical plans, and select the Physical Plan with the smallest Cost as the final Selected Physical Plan.
The next section describes some optimizations bytedance has done on Spark SQL, mainly focusing on the logical plan optimization and physical plan optimization described in this section.
Optimize the Spark SQL engine
The Bucket Join improvement
The following figure shows the basic principle of SortMergeJoin. Table 1 and Table 2 represented by the dotted box are two tables that need to be joined according to a certain field. Partition 0 to Partition M in the dotted box are the partitions after the table is converted to the RDD, not the partitions of the table. It is assumed that Table 1 and Table 2 contain M and K partitions respectively after being converted to RDD. To perform Join, you need to run the Shuffle command to ensure that the data with the same Join Key are in the same Partition and the Partition is sorted by Key. Ensure that the number of PARTITIONS in the RDD after Shuffle in Table 1 and Table 2 is the same.
For big data scenarios, data is generally written to multiple queries at a time. If two tables are often joined in the same or similar way, Shuffle costs are incurred each time. Instead, when data is written, distribute data in a way that is conducive to Join, so that Shuffle is not required during Join. As shown in the following figure, data in Table 1 and Table 2 are divided into buckets by the same Key and the number of buckets is n, and the buckets are sorted by the Key. When joining the two tables, you can directly start N tasks to Join them without Shuffle.
Improvement 1: Support compatibility with Hive
To solve this problem, we enable Spark SQL to support Hive compatibility mode to ensure that the Bucket table written by Spark SQL is the same as that written by Hive. In addition, this table can be treated as a Bucket table by Hive and Spark SQL for Bucket Join without Shuffle. In this way, transparent migration of Hive to Spark SQL is ensured.
Improvement 2: Support for multiple Bucket Join
In the first method, the number of tasks is the same as the number of small table buckets. As shown in the following figure, Table A contains three buckets and Table B contains six buckets. In this case, the data set of bucket 0 and bucket 3 in Table B should Join bucket 0 in Table A. In this case, three tasks can be started. Task 0 joins bucket 0 in Table A with bucket 0 + bucket 3 in Table B. In this case, you need to merge the data of bucket 0 and bucket 3 of Table B again to ensure the order of the collection.
If the number of buckets in Table A and Table B is similar, you can use the preceding method. If the number of buckets in Table B is 10 times that in Bucket A, the above method avoids Shuffle, but the speed of SortMergeJoin containing Shuffle may be slower than that of SortMergeJoin containing Shuffle due to insufficient parallelism. In this case, you can use another method, that is, the number of tasks and the number of large table buckets equal, as shown in the following figure.
Improvement 3: Support for BucketJoin degradation
At the same time, the average Bucket size increases rapidly due to the rapid growth of data volume. As a result, the amount of data that needs to be processed by a single Task is too large, and the effect of using Bucket may not be as good as using Shuffle-based Join directly.
Improvement 4: Support for supersets
As shown in the figure below, both Table X and Table Y have buckets according to field A. For query, Join Table X and Table Y, and Join Key sets A and B. In this case, since the Bucket ID of the data equal to A is the same in the two tables, the Bucket ID of the data equal to A and B must be the same in the two tables. Therefore, the data distribution meets the requirements of Join and Shuffle is not required. At the same time, Bucket Join also needs to ensure that the two tables are sorted according to Join Key Set A and B. In this case, only intra-partition sorting of Table X and Table Y is needed. Since both sides are already sorted by field A, sorting by A and B is relatively inexpensive.
Materialized column
When Spark SQL processes nested data, the following problems exist:
-
Read a lot of unnecessary data: For the column storage format such as Parquet/ORC, only the required fields can be read and other fields can be skipped directly, thus greatly saving IO. For fields with nested data types, the people field of Map type in the following figure usually only needs to read its subfields, such as people.age. Instead, you need to read out the entire Map people field and extract the people.age field. This introduces a lot of meaningless IO overhead. In our scenario, there are quite a few fields of the Map type, and many of them contain tens to hundreds of keys, which means the IO is magnified tens to hundreds of times.
-
Vectorization read cannot be performed: Vectorial reading can greatly improve performance. But as of now (26 October 2019) Spark does not support vector-read with nested data types. This significantly affects the performance of queries that contain nested data types
-
Filter pushdown is not supported: Spark on October 26, 2019 does not support pushdown of the Filter on the nested type field
-
Double computed: A JSON field exists as a String in Spark SQL and is not strictly a nested data type. However, in practice, it is often used to save multiple fields that are not fixed, and the target subfields are extracted through JSON Path when querying. However, the field extraction of large JSON string is very CPU consuming. For a hotspot table, frequent and repeated extraction of the same subfields is a waste of resources.
For this problem, the students who do warehouse also think of some solutions. As shown in the figure below, a table named sub_TABLE is created in addition to the table named base_TABLE, and the frequently used subfield people.age is set to an extra field of type Integer. Instead of querying people. Age in base_table, the downstream uses the age field in sub_table instead. In this way, the query on a nested type field is turned into a query on a Primitive type field, while addressing the above problem.
-
Maintaining an extra table introduces significant additional storage/computing overhead.
-
The historical data of the new field cannot be queried on the new table (for example, to support the query of the historical data, the history job needs to be rerun, which is too expensive to accept).
-
The maintainer of the table needs to modify the job of inserting data after changing the table structure.
-
The downstream query party needs to modify the query statement, and the promotion cost is large.
-
High operating cost: If frequent subfields change, delete unnecessary independent subfields and add new ones as independent fields. Before deleting this field, ensure that no downstream service uses this field. A new field needs to be notified and promoted to the downstream business side to use the new field.
To solve all the above problems, we designed and implemented materialized columns. It works like this:
-
Add a Primitive field, such as the AGE field of type Integer, and specify that it is the materialized field of people.age.
-
When data is inserted, data is automatically generated for materialized fields and materialized relationships are saved in Partition parameters. As a result, the insertion job is completely transparent, and the table maintainer does not need to modify the existing job.
-
During the query, all partitions to be queried are checked. If all partitions contain materialized information (the mapping between people. Age and age), select people. It is also compatible with historical data.
The following figure shows the benefits of using materialized columns on a core table:
In THE OLAP field, time-consuming operations such as Group By and Aggregate/Join are performed on certain fixed fields of the same table, resulting in repetitive calculation, resource waste, query performance, and user experience.
As shown in the figure above, the query history shows that a large number of queries are group by based on user, and then sum or count is calculated for num. Create a materialized view and gorup by user and avg num (avG is automatically converted to count and sum). When a user performs the SELECT user, sum(num) query on the original table, Spark SQL automatically rewrites the query to the select user, sum_num query on the materialized view.
Other optimizations on the Spark SQL engine
Spark Shuffle Improves stability and optimizes performance
The Spark Shuffle is faulty
As shown in the figure, the upstream Shuffle Stage is called the Mapper Stage, and the Task is called the Mapper Stage. The downstream Shuffle Stage is called the Reducer Stage, and tasks in the Stage are called reducers.
Each Mapper divides its data into a maximum of N parts. N is the number of reducers. Each Reducer needs to use a maximum of M (number of Mapper) mappers to obtain its own data.
- Stability problem: Shuffle Write of the Mapper Data is stored on the local disk of the Mapper and has only one copy. If the vm has a disk fault or has a full I/O or CPU load, the Reducer cannot read the data. As a result, the FetchFailedException occurs and the Stage Retry occurs. Stage Retry increases the job execution time, which directly affects the SLA. In addition, the longer the execution time is, the more likely Shuffle data cannot be read, which in turn causes more stages to Retry. This loop can cause large jobs to fail to execute successfully.
- Performance issues: A large number of data is read from each Mapper, and different parts are read randomly. If the Shuffle output of a Mapper is 512MB and there are 100,000 reducers, the data read from each Reducer on average is 5.24KB (512MB / 100000). In addition, data is read from different Reducer concurrently. For Mapper output files, there are a lot of random reads. The random I/O performance of HDDS is much lower than that of sequential I/O. As a result, the Reducer reads Shuffle data very slowly. According to Metrics, the Reducer Shuffle Read Blocked takes a long Time, or even more than half of the Reducer execution Time.
The stability of THE HDFS Shuffle is improved
As shown in the figure, the CPU usage is close to 100%. As a result, the Spark External Shuffle Service in the Node Manager on the Mapper side cannot provide the Shuffle Service in a timely manner.
The main problem is that the Shuffle Write data on the Mapper is only stored locally. If the node is faulty, all the Shuffle Write data on the node cannot be read by the Reducer. A general approach to this problem is to ensure availability through multiple copies.
Initially, a simple solution is to write the final data files and index files on the Mapper side to the HDFS instead of the local disk. Instead of reading Shuffle data from the External Shuffle Service on the Mapper side, the Reducer directly obtains data from the HDFS. See the following figure.
After a quick implementation of this solution, we did a few simple tests. The results show that:
-
If there are not too many mappers and reducers, the read/write performance of Shuffle is the same as that of the original solution.
-
When there are too many mappers and reducers, Shuffle read becomes very slow.
The reason is that data files and index files need to be read from 10000 Mappers in total. The HDFS must be read 200 million times (10000 x 1000 x 2).
If it’s just a single point of performance for Name Node, there are some simple solutions. For example, the Spark Driver saves the Block Location of all mappers, and the Driver broadcasts the information to all executors. Each Reducer can obtain the Block Location directly from the Executor and read Data directly from the Data Node without connecting to the Name Node. However, in view of the threading model of Data Node, this scheme will have a great impact on Data Node.
The Shuffle output data of the Mapper is still written to the local disk in the original scheme, and then uploaded to the HDFS. The Reducer still reads Shuffle data using the External Shuffle Service on the Mapper side based on the original plan. If this fails, the HDFS is read from the HDFS. This solution greatly reduces the frequency of HDFS access.
The program has been online for nearly a year:
-
More than 57% of Spark Shuffle data must be covered.
-
The overall Spark job performance is improved by 14%.
-
Performance for day scale operations increased by 18%.
-
Hourly performance improved by 12%.
This scheme aims to improve the stability of Spark Shuffle and thus improve the stability of the operation, but in the end, it does not use indicators such as variance to measure the improvement of stability. The reason is that the cluster load varies from day to day and the overall variance is large. After the Shuffle stability is improved, the Stage Retry is significantly reduced, reducing the overall job execution time and improving the Shuffle performance. Finally, the performance improvement was compared by comparing the total job execution time before and after using the scheme to measure the effect of the scheme.
As described above, Shuffle performance problems are caused by the fact that Shuffle Write is completed by the Mapper, and the Reducer needs to read data from all mappers. This model, we call it a Mapper-centric Shuffle. Here’s the problem:
-
There are M sequential write I/OS on the Mapper.
-
The Mapper side has M * N * 2 random read IO (the biggest performance bottleneck).
-
The External Shuffle Service on the Mapper side must reside on the same machine as the Mapper. Therefore, storage and computing cannot be separated effectively, and the Shuffle Service cannot be independently expanded.
-
Change M * N * 2 random I/OS to N sequential I/OS.
-
The Shuffle Service can be deployed independently of the Mapper or Reducer to achieve independent expansion and separation of storage and computing.
-
The Shuffle Service directly stores data to high-availability storage such as HDFS, thereby ensuring Shuffle stability.
QA highlights
Answer: There are too many historical data to modify.
Answer: In general, users modify data in units of partitions. So we store the materialized column information on the Partition Parameter. If the user’s query contains both the new Partition and the historical Partition, we will SQL Rewrite the materialized columns on the new Partition, but the historical Partition will not Rewrite. Then the new and old partitions are Union, so as to make full use of the advantages of physical and chemical columns on the premise of ensuring the correctness of data.
Answer: At present, we are mainly using some audit information to assist manual analysis. At the same time, we are also doing materialized column and materialized view recommendation service, and finally achieve intelligent construction materialized column and materialized view.
Answer: It’s a good idea, and we’ve considered it before, but we didn’t do it for a few reasons. First, the Shuffle output data of a single Mapper is generally small. Uploading data to HDFS takes less than 2 seconds. This time cost can be ignored. Second, External Shuffle Service and Dynamic Allocation are widely used. After Mapper is executed, Executor may be recycled. If asynchronous uploading is required, other components must be relied on, which increases complexity and leads to low ROI.
More wonderful share
Shanghai salon review | bytes to beat level how to optimize all nodes HDFS platform
Shanghai salon review | Apache Kylin principle is introduced to share with the new architecture (Kylin On Parquet)
Review | Shanghai salon Redis high-speed cached in the application of the large data scenarios
Bytedance Technology salon
Bytedance Technology Salon invites technical experts from Bytedance and Internet companies in the industry to share hot technology topics and front-line practical experience, covering architecture, big data, front end, test, operation and maintenance, algorithm, system and other technical fields.
Bytedance Technology Salon aims to provide an open and free exchange and learning platform for talents in the field of technology to help them learn and grow and continue to advance.