preface

Spark is a mainstream big data computing engine. Its functions include offline batch processing, SQL class processing, streaming/real-time computing, machine learning, graph computing, and other computing operations in the big data field. Spark has a wide range of applications and prospects. As a memory computing framework, Spark has high computing speed and can meet various data calculation and processing requirements, such as UDF, size table Join, and multi-path output.

As a professional data intelligent service provider in China, ITUpush introduced Spark in the early version 1.3, and built a data warehouse based on Spark to perform offline and real-time calculation of large-scale data. Prior to the 2.x version of Spark, the optimization focus was on the computing engine, and there were no major improvements or upgrades in metadata management. Therefore, Getuan still uses Hive for metadata management and the big data architecture of Hive metadata management and Spark computing engine to support its own big data service development. Spark is also widely used in report analysis, machine learning and other scenarios to provide real-time population insight and group portrait construction services for industry customers and government departments.

▲ In the actual business scenario, SparkSQL and HiveSQL are used to calculate a 3T data respectively. The figure above shows the running speed. SparkSQL2.3 is 5 to 10 times faster than Hive1.2 when the queue is locked (120 gb of memory, <50core).

For enterprises, efficiency and cost are always the issues they must pay attention to when processing and calculating massive data. How do you take advantage of Spark to reduce costs and increase efficiency in big data operations? Here is a summary of Spark performance tuning tips accumulated over the years.

Spark Performance Tuning – Basics

As is known to all, correct parameter configuration greatly improves Spark usage efficiency. Therefore, for Spark users who do not understand the underlying principles, we provide a parameter configuration template that can be copied directly to help data developers and analysts efficiently use Spark for offline batch processing and SQL report analysis.

The recommended configuration template is as follows:

  1. Spark-submit Submit script

  1. Spark-sql Script of submission mode

Spark Performance Tuning – Advanced

For those who wish to learn about the underlying principles of Spark, this document describes the interaction diagram of the three common task submission modes, such as standalone, yarn-client, and yarn-Cluster, to help users understand the core technologies and principles of Spark and lay a foundation for the following advanced chapters.

standalone

  1. Spark-submit submits a DriverActor process by reflection.

  2. The Driver process executes the written application, constructs sparkConf, and constructs sparkContext.

  3. When SparkContext is initialized, DAGScheduler and TaskScheduler are constructed and Jetty starts webui.

  4. TaskScheduler sparkdeployschedulebackend process, to Master communication, request registration Application;

  5. After receiving the communication, the Master registers the Application, uses the resource scheduling algorithm, notifies the Worker, and asks the Worker to start the Executor.

  6. The worker starts the executor for the application, and the executor reversely registers with the TaskScheduler.

  7. After all executors are reversely registered with the TaskScheduler, the Driver finishes initialization of sparkContext.

  8. The Driver continues to execute the written application, creating a job for each action.

  9. The job is submitted to DAGScheduler, which divides the job into multiple stages (stage partitioning algorithm) and creates a taskSet for each stage.

  10. A task scheduler submits each task in a taskSet to an executor for execution.

  11. Each time the Executor receives a task, it wraps the task with the taskRunner, and then fetches one thread from the Executor thread pool to execute the Task Runner. (Task runner: copy, deserialize, and execute task)

Yarn-client

  1. Send a request to ResourceManager to start ApplicationMaster (AM).

  2. RM assigns a Container to a NodeManager (NM) and starts the AM, which is an ExecutorLauncher.

  3. AM applies for a Container from RM.

  4. RM allocates containers to AM.

  5. AM requests NM to start the corresponding Executor;

  6. After executor is started, the Driver process is reversely registered.

  7. Post-sequentially divide the stage, submit the taskset, and standalone modes are similar.

Yarn-cluster

  1. Send a request to ResourceManager to start ApplicationMaster (AM).

  2. RM allocates a Container to a NodeManager (NM) and starts the AM.

  3. AM applies for a Container from RM.

  4. RM allocates containers to AM.

  5. AM requests NM to start the corresponding Executor;

  6. When executor starts, reverse register with AM;

  7. Post-sequentially divide the stage, submit the taskset, and standalone modes are similar.

After understanding the basic interaction of the three common tasks, this article introduces the storage format, data skew, and parameter configuration to share the advanced gestures for Spark performance tuning.

Storage format (file format, compression algorithm)

It is well known that different SQL engines optimize for different storage formats in different ways. For example, Hive prefers ORC and Spark prefers Parquet. At the same time, when performing big data operations, point-and-search, wide table query and large table join operations are relatively frequent, which requires that the file format should adopt column storage and be separable. Therefore, we recommend parquet and ORC column storage file formats and Gzip, SNappy and Zlib compression algorithms. In terms of combination mode, we suggest the combination mode of Parquet + Gzip and ORC + Zlib. Such combination mode takes into account the situation of column storage and partition. Compared with TXT +gz, the combination mode of line storage and partition is more suitable for the requirements of the above big data scenarios.

Taking 500GB data online as an example, the performance of different storage file formats and algorithm combinations is tested in different cluster environments and SQL engines. The test data show that under the same resource condition, the parquet+gz storage format is more than 60% faster than the TEXT +gz storage format in multi-value query and multi-table join.

Combined with the test results, we sorted out the recommended storage formats for different cluster environments and SQL engines, as shown in the following table:

Meanwhile, we also test the memory consumption of Parquet +gz and ORC +zlib. Take a single historical partition data of a table as an example, parquet+gz and ORC +zlib save 26% and 49% of storage space compared with TXT +gz respectively.

The complete test results are shown in the following table:

It can be seen that Parquet +gz, ORC + Zlib are indeed significant in cost reduction and efficiency improvement. So how do you use these two storage formats? The steps are as follows:

➤ Enable compression algorithms for specific file formats using Hive and Spark

  • The spark: set spark.sql.parquet.compression.codec=gzip; set spark.sql.orc.compression.codec=zlib;
  • Hive: set hive.exec.compress.output=true; set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

➤ Specify the file format when building a table

  • Parquet file format (serialization, input/output classes) CREATE EXTERNAL TABLE `test`(rand_num double) PARTITIONED BY (`day` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' ;
  • Orc file formats (serialization, INPUT/output classes) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' ;

➤ The line is tuned

ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip'); ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');Copy the code

➤ ctas built table

Create table tablename stored as parquet as select... ; Create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB') as select... ;Copy the code

Data skew

Data skew can be classified into map skew and Reduce skew. This paper focuses on reduce skew, such as group by and Join, which are common in SQL. When data skew occurs, some tasks are slower than other tasks in the same batch, the amount of data in some tasks is larger than other tasks, and some taskOOM and Spark Shuffle files are lost. As shown in the following example, in the Duration column and shuffleReadSize/Records column, we can obviously find that the amount of data processed by some tasks increases significantly and the time takes longer, resulting in data skew:

How to solve data skew?

We’ve rounded up seven data skew solutions that can help you solve common data skew problems:

Solution 1: Use Hive ETL to preprocess data

That is, the skew problem is moved forward in the data kinship so that the downstream users need not consider the data skew problem any more.

⁕ This solution applies to downstream interactive services, such as second/minute query.

Solution 2: Filter out the few keys that cause skew

This scheme is generally used in combination with percentage points. If the number of 99.99% ID records is less than 100, then the ids beyond 100 can be considered for elimination.

⁕ This solution is practical in statistical scenarios. In detailed scenarios, you need to check whether key filtering is important to services.

Solution 3: Increase the parallelism of shuffle operations

To spark. SQL. Shuffle. Partitions parameters for dynamic adjustment, by increasing the shuffle write a task to write the number of partition to achieve uniform distribution of the key. SparkSQL2.3 By default, this value is 200. Developers can add the following parameters to the startup script to dynamically adjust the value:

  • conf spark.sql.shuffle.partitions=10000
  • conf spark.sql.adaptive.enabled=true
  • conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728

⁕ This scheme is very simple, but it can play a good optimization role for even distribution of keys. For example, if there were 10 keys with 50 records each and only one partition, the subsequent task would need to process 500 records. By increasing the number of partitions, each task can process 50 records and run 10 tasks in parallel, which takes 1/10 of the time of one task. However, this scheme is difficult to optimize for large keys. For example, if there are millions of records for a large key, the large key will still be allocated to one task.

Solution 4: Change reduceJoin into MapJoin

This parameter indicates that a MAP join is performed without the shuffle process. Using Spark as an example, you can send small RDD data to each Worker node (NM in Yarn mode) in the form of broadcast variables to join on each Worker node.

⁕ This solution applies to the scenario where small tables join large tables (data volumes of more than 100 GB). The default threshold of small tables here is 10M. Small tables lower than this threshold can be distributed to worker nodes. The maximum memory size that can be adjusted must be smaller than that allocated by containers.

Solution 5: Sample the slanted key and split the join operation

Table A joins table B. Table A has A big key and table B does not have A big key. The id of the big key is 1 and there are three records.

How do I do a split join?

  • First of all, ID1 in table A and table B is separated separately, and A’ and B’ of large keys are removed to achieve A non-slanting speed;

  • Add random prefixes for large keys in table A, expand table B by N, and join table B separately. Delete random prefix after join;

  • Then the union of the above two parts is matched.

**⁕** The essence of this scheme is to reduce the risk of data skew caused by excessive data processing by a single task, and it is suitable for the case of a small number of large keys.

Solution 6: Use random prefixes and RDD expansion to join

For example, if table A has A large key and table B does not have A large key, join table B:

  • Add A random prefix [1,n] to each record in table A, expand table B by n, join.

  • Random prefixes are removed after join completion.

⁕ This solution applies to the situation where there are many large keys, but it also increases resource consumption.

Solution 7: Combiner

That is, perform the combiner operation on the Map side to reduce the amount of data pulled by shuffle.

**⁕** This scheme is suitable for scenarios such as summation.

In actual scenarios, it is recommended that related developers analyze the situation on a case-by-case basis. You can also combine the above methods for complex problems.

Spark Parameter Configuration

In the case of no data skew, the parameter configuration reference table is summarized to help you tune Spark performance. The parameter Settings are suitable for insight and application of 2 TB data and meet tuning requirements in most scenarios.

conclusion

Spark 3.1.2 released (Jun 01, 2021) Spark 3.1.2 released (Jun 01, 2021) Spark3.x’s many new features, such as dynamic partition pruning, major improvements to the Pandas API, enhanced clipping and push-down for nested columns, provide a good way to further reduce costs and increase efficiency. In the future, Getuple will continue to focus on Spark’s evolution and continue to practice and share.