The introduction

With the evolution of big data technology architectures, the architecture that separates storage and computing can better meet users’ demands for reducing data storage costs and scheduling computing resources on demand. More and more people are choosing this architecture. Compared with HDFS, object-based data storage can save storage costs. However, object-based data storage has poor write performance for massive files.

Tencent Cloud Resilience MapReduce(EMR) is a cloud hosted elastic open-source Hadoop service that supports big data frameworks such as Spark, Hbase, Presto, Flink, and Druid.

Recently, while supporting an EMR customer, I encountered a typical storage-computing separation application scenario. The customer uses the Spark component of EMR as the computing engine and stores data on object storage. During technical tuning, Spark’s write performance is low in the scenario of massive files, affecting the overall architecture performance.

After in-depth analysis and optimization, we finally improved the write performance significantly, especially the write object storage performance by more than 10 times, which accelerated the business processing and won favorable comments from customers.

This article will introduce how Tencent Cloud EMR Spark computing engine improves write performance in massive file scenarios in the storage and computing separation architecture, and hope to communicate with you. Article author: Zhong Degen, Tencent background development engineer.

First, background

Apache Spark is a fast, general-purpose computing engine designed for large-scale data processing that can be used to build large, low-latency data analysis applications. Spark is a general parallel framework of Hadoop MapReduce developed by UC Berkeley AMP Lab. Spark has the advantages of Hadoop MapReduce.

Unlike Hadoop, Spark and Scala are tightly integrated, where Scala can manipulate distributed data sets as easily as local collection objects. Although Spark was created to support iterative jobs on distributed datasets, it is actually a complement to Hadoop and can run in parallel on Hadoop file systems or on cloud storage.

In the process of technical tuning, the computing engine we studied is the Spark component in EMR products. Due to its excellent performance and other advantages, it has become the choice of more and more customers in big data computing engine.

For storage, customers choose object storage. In terms of data storage, object storage has features such as reliability, scalability and low cost. Compared with HDFS, the Hadoop file system, object storage is a better low-cost storage method. Massive amounts of warm and cold data are better suited for object storage to reduce costs.

In the Hadoop ecosystem, native HDFS storage is also an essential storage choice in many scenarios, so we also include storage performance comparisons with HDFS below.

Back to the problem we want to solve, let’s take a look at a set of test data. Based on spark-2. x engine, SparkSQL is used to write 5000 files to HDFS and object storage respectively, and the execution duration is calculated respectively:

The test results show that writing to the object storage takes 29 times longer than writing to HDFS, and the performance of writing to the object storage is much worse than that of writing to HDFS. When we observe the data writing process, we find that network IO is not the bottleneck, so we need to deeply analyze the specific process of computing engine data output.

Analysis of Spark data output process

1. Spark data flow

The following figure shows the main process of data flow during Spark job execution:

First, each task writes the result data to the temporary/task[ID] directory of the underlying file system, as shown below:

Now that the executor task is done, it is up to the driver to move the resulting data files to the final location of the Hive table in three steps:

First, call the commitJob method of OutputCommiter to dump and merge temporary files:

As you can see from the diagram above, commitJob will merge all data files in the task_[id] subdirectory into the upper directory ext-10000.

Next, if you overwrite the write data mode, the existing data in the table or partition is first moved to the Trash recycle bin.

After the preceding operations are complete, the data files merged in the first step are moved to the location of the Hive table.

2. Locate and analyze the root cause

Based on the analysis of Spark data flow, do you need to locate performance bottlenecks on the driver or executor side? Observe how long jobs take on executor:

It is found that the execution time of the job on the Executor side is not very different, but the total time is very different, indicating that the job is mainly spent on the driver side.

On the driver, there are commitJob, trashFiles, and moveFiles stages. Which stages take a long time?

We observed Thread dump through spark-UI (manually refresh spark-UI or log in to the driver node and use the jstack command to view the Thread stack information), and found that the three phases were slow. The following is the source code of these three parts.

3. Source code analysis

(1) JobCommit stage

Spark is using Hadoop FileOutputCommitter to handle file merging, Hadoop 2. Default to x graphs. FileOutputCommitter. Algorithm. Version = 1, Merge path using a single-threaded for loop to traverse all task subdirectories and then perform the merge path operation, which is obviously time-consuming in many cases in the output file.

Especially for object storage, rename does not just modify metadata, but also copy data to a new file.

(2) TrashFiles stage

The trashFiles operation is a single-threaded for loop to move files to the file recycle bin, which can be very slow if there is a lot of data to overwrite.

(3) MoveFiles stage

Similar to the previous problem, a single-threaded for loop is used to move files in the moveFiles phase.

4. Problem summary

  • The performance bottleneck of the Spark engine in writing massive files is on the Driver.

  • The execution takes a long time in the CommitJob, TrashFiles, and MoveFiles phases of the Driver.

  • All three phases take a long time because a single-threaded loop processes files one by one;

  • The rename function of object storage requires data copy. As a result, it takes a long time to write massive files.

3. Optimization results

The main reason is that most data platforms are based on HDFS storage, and HDFS only needs to modify metadata on namenode. This operation is very fast. Performance bottlenecks are not easy to hit.

At present, cloud data and separation of storage and computation are important considerations for enterprises to reduce costs. Therefore, we try to modify commitJob, trashFiles and moveFile codes into multi-threaded parallel file processing to improve the performance of file writing operations.

Based on the same benchmark, SparkSQL is used to write 5000 files to HDFS and object storage respectively, and the optimized results are shown in the figure below:

Finally, HDFS write performance improved by 41% and object storage write performance improved by 1100%.

Four, conclusion

From the above analysis, the key problem is caused by the single-thread limit in some parts of the Spark engine. The single-thread limit is actually a common technical bottleneck. We had a guess at the beginning, but it took a lot of clarity, a good look at the source code, and a lot of debugging.

In addition, the analysis also found the limitations of object storage itself. Its RENAME performance requires copying data, resulting in a long time to write massive files. We are still making improvements.

It is an important goal of Tencent Cloud Elastic MapReduce(EMR) product r&d team to further optimize storage computing separation application scenarios, improve performance, and better meet customers’ demand for reducing cost and increasing efficiency for storage computing separation scenarios. Welcome to discuss relevant issues together.