Spark tuning Shuffle tuning

This section explains the core concepts of Shuffle. Then, it is tuned for HashShuffle and SortShuffle. Then, tune the Map end and Reduce end. Then analyze and tune the data skew problem in Spark. Finally, troubleshooting during Spark operation.

This article was first published on the public account “Five Minutes to Learn Big Data”, which focuses on big data technology and shares high-quality original technical articles on big data.

1. Core concepts of Shuffle

1. ShuffleMapStage与ResultStage

When dividing the stages, the last stage is called FinalStage, which is essentially a ResultStage object, and all the preceding stages are called ShufflemapStages.

The ShuffleMapStage ends with the shuffle file written to disk.

The ResultStage basically corresponds to the action operator in the code, that is, a function is applied to the data set of each PARTITION in the RDD, indicating the completion of a job.

2. Number of tasks in Shuffle

Spark Shuffle consists of the Map phase and Reduce phase, or ShuffleRead phase and ShuffleWrite phase. Therefore, several tasks execute both the Map and Reduce processes for a Shuffle. How do you determine the number of Map and Reduce tasks?

If the Spark task reads data from the HDFS, the initial number of RDD partitions is determined by the number of splits in the file, that is, a split corresponds to a partition in the GENERATED RDD. We assume that the initial number of partitions is N.

After the initial RDD is calculated by a series of operators (assuming that repartition and coalesce operators are not used for repartitioning, the number of partitions remains the same (N); if repartition operators are used, the number of partitions becomes M), we assume that the number of partitions remains the same. When the Shuffle operation is performed, The number of Map tasks is the same as that of partitions, that is, the number of Map tasks is N.

Reduce the stage take spark by default. The default. The parallelism of this configuration item value as the number of partitions, if there is no configuration, the last RDD to map the partition number as its partition number (N), then the partition number is decided to reduce the number of the task.

3. Read data on the Reduce end

According to the division of stages, map task and Reduce task are not in the same stage. Map Task is located in ShuffleMapStage and Reduce task is located in ResultStage. Map task will be executed first. How does the later Reduce task know where to pull the data from the map task?

The data pulling process on the Reduce side is as follows:

  1. After the map task is executed, information such as calculation status and location of small disk files will be encapsulated into the MapStatus object. Then the MapOutPutTrackerWorker object in this process sends the mapStatus object to the MapOutPutTrackerMaster object in the Driver process.
  2. Before the Reduce task starts to execute, the MapOutputTrackerWorker in the current process sends a request to the MapoutPutTrakcerMaster in the Driver process to request the location of small files on disks.
  3. After all Map tasks are completed, the MapOutPutTrackerMaster in the Driver process knows the location of all small files on the disk. The MapOutPutTrackerMaster tells the MapOutPutTrackerWorker the location of the disk small file.
  4. After the previous operations are complete, BlockTransforService pulls data from the Executor0 node. By default, five child threads are started. The amount of data to be pulled at a time cannot exceed 48 MB. (The Reduce Task obtains a maximum of 48 MB of data at a time and stores the pulled data to 20% of the Executor memory.)

HashShuffle parsing

The following discussion assumes that each Executor has one CPU core.

1. Unoptimized HashShuffleManager

In the Shuffle write stage, data processed by each task is “divided” by key so that shuffle operators (such as reduceByKey) can be implemented in the next stage after the calculation of one stage. By “partition”, the same key is written to the same disk file by the hash algorithm, and each disk file belongs to only one task of the downstream stage. Before writing data to disk, data is written to the memory buffer. When the memory buffer is full, data is overwritten to the disk file.

How many tasks will be created for the next stage? How many disk files will be created for each task of the current stage. For example, if the next stage has a total of 100 tasks, each task of the current stage will create 100 disk files. If the current stage has 50 tasks, a total of 10 executors, and each Executor executes 5 tasks, then a total of 500 disk files will be created on each Executor, and 5000 disk files will be created on all executors. Thus, the number of disk files generated by an unoptimized Shuffle Write operation is staggering.

The shuffle read phase is usually what you do at the beginning of a stage. At this point, each task of this stage needs to pull all the same keys in the calculation results of the previous stage from each node to its own node through the network, and then perform key aggregation or connection operations. During shuffle Write, a Map task creates a disk file for each Reduce task in the downstream stage. Therefore, during shuffle Read, each Reduce task needs to locate all Map tasks in the upstream stage. Pull that disk file that belongs to oneself can.

Shuffle Read’s pull process is aggregated as it is pulled. Each Shuffle Read task has its own buffer. Each shuffle Read task can only pull data of the same size as the buffer and perform operations such as aggregation through a Map in memory. After aggregating a batch of data, pull down the next batch of data and put it into the buffer buffer for aggregation. And so on, until finally all the data to pull out, and get the final result.

The non-optimized HashShuffleManager works as shown below:

2. Optimized HashShuffleManager

To optimize HashShuffleManager we can set a parameter: Spark. Shuffle. ConsolidateFiles, this parameter is the default value is false, it is set to true can open the optimization mechanism, generally speaking, if we use HashShuffleManager, then recommend this option.

After the consolidate mechanism is enabled, tasks do not create a disk file for each task in the downstream stage during shuffle write. In this case, shuffleFileGroup is introduced. Each shuffleFileGroup corresponds to a batch of disk files, and the number of disk files is the same as the number of tasks in the downstream stages. An Executor can execute as many tasks in parallel as it has CPU cores. Each of the first tasks executed in parallel creates a shuffleFileGroup and writes data to the corresponding disk file.

When the Executor CPU core completes a batch of tasks and then executes the next batch of tasks, the next batch of tasks will reuse the existing shuffleFileGroup, including the disk files in it. That is, the task will write data to the existing disk files. It does not write to a new disk file. Therefore, consolidate allows different tasks to reuse the same batch of disk files. In this way, disk files of multiple tasks are consolidated to a certain extent, greatly reducing the number of disk files and improving shuffle Write performance.

Assuming the second stage has 100 tasks and the first stage has 50 tasks, there are still 10 executors (with 1 Executor CPU), each executing 5 tasks. The original unoptimized HashShuffleManager would have generated 500 disk files per Executor and 5,000 disk files for all executors. However, after optimization, the number of disk files created per Executor is calculated as follows: The number of CPU cores * the number of tasks at the next stage, i.e., each Executor will create 100 disk files and all executors will create 1000 disk files.

The working principle of the optimized HashShuffleManager is shown in the figure below:

3, SortShuffle

SortShuffleManager can be divided into two running mechanisms, one is ordinary running mechanism, the other is bypass running mechanism. When shuffle read task less than or equal to the number of spark. Shuffle. Sort. BypassMergeThreshold when the value of the parameter (the default is 200), will enable the bypass mechanism.

1. Common operating mechanism

In this mode, data is first written into a memory data structure. In this case, different data structures may be selected based on the shuffle operator. If it is the reduceByKey shuffle operator of aggregation class, Map data structure will be selected and the data will be written into the memory while the aggregation is carried out through Map. If join is a common shuffle operator, Array data structures are directly written into the memory. Then, every time a piece of data is written into an in-memory data structure, it determines whether a critical threshold has been reached. If a critical threshold is reached, an attempt is made to overwrite the in-memory data structure to disk and then flush the in-memory data structure.

Before overwriting to disk files, the existing data in the memory data structure is sorted by key. After sorting, data is written to disk files in batches. The default batch number is 10000. That is, 10000 pieces of sorted data are written to disk files in batches. Writing to disk files is done through Java’s BufferedOutputStream. BufferedOutputStream is a Java BufferedOutputStream. The BufferedOutputStream first buffers data in the memory. When the memory buffer is full, data is written to the disk file again, which reduces disk I/o times and improves performance.

Multiple disk overwrites occur when a task writes all data to an in-memory data structure, resulting in multiple temporary files. Finally, all temporary disk files are merged. This process is called merge. In this process, data from all temporary disk files is read and written to the final disk file. In addition, a task corresponds to only one disk file, which means that the data prepared by the task for the tasks of the downstream stage are all in this file. Therefore, an index file is written separately, which identifies the start offset and end offset of the data of each downstream task in the file.

SortShuffleManager greatly reduces the number of files because it has a disk file merge process. For example, the first stage has 50 tasks and a total of 10 executors, each executing 5 tasks, while the second stage has 100 tasks. Because each task ends up with only one disk file, there are only five disk files per Executor and 50 disk files for all executors at this point.

The working principle of SortShuffleManager with common operation mechanism is shown as follows:

2. Bypass operation mechanism

Triggering conditions of the bypass mechanism are as follows:

  • The number of Shuffle Map tasks is smaller than that of shuffle Map tasksspark.shuffle.sort.bypassMergeThreshold=200Parameter value.
  • Shuffle operator that is not an aggregation class.

In this case, each task creates a temporary disk file for each downstream task, hash the data based on the key, and write the key into the disk file based on the hash value of the key. Of course, a disk file is written to the memory buffer first, and then overwrites to the disk file when the buffer is full. Finally, all temporary disk files are also merged into a single disk file and a single index file is created.

The disk write mechanism of this process is exactly the same as that of the unoptimized HashShuffleManager, in that a staggering number of disk files are created, only a disk file merge is done at the end. Therefore, the small number of final disk files also makes shuffle Read perform better than the unoptimized HashShuffleManager.

The difference between this mechanism and ordinary SortShuffleManager operation mechanism lies in: first, the disk write mechanism is different; Second, it doesn’t sort. In other words, the biggest advantage of this mechanism is that data sorting is not required during Shuffle Write, thus reducing the performance overhead.

The SortShuffleManager of the bypass operation mechanism works as shown in the following figure:

4. Size of the Map and Reduce buffer

During Spark tasks, if the map side of the Shuffle processes a large amount of data, but the size of the map side buffer is fixed, the map side buffer data may be frequently spilled to disk files, resulting in poor performance. Adjust the size of the Map side buffer. This prevents frequent DISK I/O operations and improves the overall performance of Spark tasks.

The default configuration of map buffer is 32KB. If each task processes 640KB of data, 640/32 =20 write overflows occur. If each task processes 64000KB of data, 64000/32=2000 write overflows occur, which has a serious impact on performance.

The configuration method of the map buffer is as follows:

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer"."64")
Copy the code

During Spark Shuffle, the size of the Shuffle Reduce Task’s buffer determines the amount of data that the Reduce Task can buffer, that is, the amount of data that can be pulled at a time. If the memory resources are sufficient, increase the size of the pull data buffer. You can reduce the number of times to pull data, which can reduce the number of network transmission, thereby improving performance.

Reduce the data pull the size of the buffer can spark. Reducer. MaxSizeInFlight parameters setting, the default for 48 MB. Set this parameter as follows:

Reduce side data pull buffer configuration:

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight"."96")
Copy the code

V. Retry times and waiting time interval of reduce terminal

During Spark Shuffle, if the Reduce task fails to fetch its own data due to network exceptions, the task automatically tries again. For jobs that involve particularly time-consuming shuffle operations, it is recommended to increase the maximum number of retries (say, 60) to avoid pull failures due to factors such as the JVM’s full GC or network instability. In practice, it is found that adjusting this parameter can greatly improve the stability of the shuffle process with a large amount of data (billions to tens of billions).

Reduce side pull data retries can spark. Shuffle. IO. MaxRetries parameters Settings, this parameter represents the can retry the maximum times. If the pull fails within the specified number of times, the job may fail to execute. The default value is 3.

Retry times for pulling data on the Reduce end:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries"."6")
Copy the code

During Spark Shuffle, if the Reduce task fails to fetch its own data due to a network exception, it automatically tries again. After a failure, it waits for a certain interval and tries again. You can increase the interval (for example, 60 seconds) to improve the stability of the Shuffle operation.

Reduce side pull data wait interval can spark. Shuffle. IO. RetryWait parameters setting, the default value is 5 s, the parameter setting method is as follows:

Data waiting interval configuration on the Reduce end:

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait"."60s")
Copy the code

6. Threshold for enabling the bypass mechanism

For SortShuffleManager, if the number of Shuffle Reduce tasks is less than a certain threshold, data is written in the unoptimized HashShuffleManager mode instead of sorting during Shuffle Write. But eventually all the temporary disk files produced by each task are merged into one file and a separate index file is created.

When you use SortShuffleManager, if sorting is not required, you are advised to set this parameter to a value greater than the number of shuffle read tasks. In this case, map-side sorting will not be performed, reducing the sorting overhead. A large number of disk files are still generated. Therefore, shuffle Write performance needs to be improved.

SortShuffleManager sort operations threshold Settings can spark. Shuffle. Sort. BypassMergeThreshold this parameter set, the default value is 200, the parameter setting method is as follows:

Data waiting interval configuration on the Reduce end:

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold"."400")
Copy the code

Data skew

That is, the amount of data divided into various partitions is not quite uniform, you can customize the partition, how to divide it.

The data skew problem in Spark mainly refers to the data skew problem that occurs during shuffle. The data amount corresponding to different keys varies, resulting in different data amounts processed by different tasks.

For example, the reduced end processes 1,000,000 pieces of data. The first and second tasks are allocated 10,000 pieces of data respectively, and the calculation is completed within 5 minutes. The third task is allocated 980,000 pieces of data, and the third task may take 10 hours to complete. This makes the entire Spark job take 10 hours to complete, which is the result of data skew.

Note that it is important to distinguish between data skew and data overload. Data skew is when a few tasks are allocated the majority of data, so a few tasks run slowly. Data overload means that all tasks are allocated a large amount of data, and all tasks run slowly.

Performance of data skew:

  1. Most of the Spark tasks are executed quickly. Only a limited number of tasks are executed very slowly. In this case, data skew may occur.
  2. Most of the Spark tasks are executed quickly. However, some tasks suddenly display an OOM error when they are executed for several times. In this case, data skew may occur and the jobs cannot run properly.

Locate the data skew problem:

  1. Check the shuffle operators in the code, such as reduceByKey, countByKey, groupByKey, and Join, and determine whether data skewing occurs according to the code logic.
  2. View the Log file of the Spark job. The log file records the error exactly to a line of the code. You can determine the stage where the error occurs and the corresponding shuffle operator according to the code location where the exception is located.

1. Pre-aggregate raw data

1. Avoid the shuffle process

In most cases, Spark job data is obtained from Hive tables, which are generated yesterday after ETL. To avoid data skew, avoid the shuffle process. If the shuffle process is avoided, the possibility of data skew is fundamentally eliminated.

If Spark job data comes from Hive tables, you can first aggregate data in Hive tables. For example, group data by key and combine all values corresponding to a key into a string in a special format. In this way, only one key has one data. After that, all the values of a key need to be processed by map instead of shuffle. In this way, shuffle operation is avoided, preventing any data skew problem.

Operations on Hive table data do not need to combine the data into a string, but can directly calculate the accumulated data of each key. To distinguish between the large amount of data being processed and the skew of data.

2. Increase the key granularity (reduce the possibility of data skew and increase the amount of data per task)

If there is no way to aggregate one data for each key, you can expand the aggregation granularity of keys in specific scenarios.

, for example, there are currently 100000 user data, the granularity of the current key is (province, city, district, date), we now consider expanding granularity, will be the key size expanded to (province, city, date), so, the key will reduce the number of the amount of data between the key difference may be reduced, thus can reduce the phenomenon of data skew and problems. (This method is only effective for specific types of data. Data skew will be aggravated if the application scenario is not suitable.)

2. Preprocess keys that cause skew

1. The filter

If certain data is allowed to be discarded in Spark jobs, you can filter out keys that may cause data skew and data corresponding to keys that may cause data skew. In this way, data skew does not occur in Spark jobs.

2. Use random keys

When operators such as groupByKey and reduceByKey are used, random key can be considered to realize double aggregation, as shown in the figure below:

First, the map operator is used to add a random prefix to the key of each data, and then the key is broken up. The same key is changed into a different key, and then the first aggregation is performed. In this way, the data processed by one task can be dispersed to multiple tasks for local aggregation. Then, the prefix for each key is removed and the aggregation is done again.

This method has a good effect on data skewing caused by groupByKey and reduceByKey operators, and is only applicable to shuffle operation of aggregation, with a relatively narrow scope of application. If it is a Shuffle operation of the Join class, another solution is needed.

This method is also a solution to try when the previous several schemes have no good effect.

3. Sample Sample Join the slanted key separately

In Spark, if an RDD has only one key, the data corresponding to the key is scattered by default during shuffle and processed by different Reduce tasks.

Therefore, if data skew is caused by a single key, you can extract the key that causes data skew separately to form an RDD, and then join the RDD composed of the key that causes data skew separately with other RDD’s. In this case, according to the Operation mechanism of Spark, The data in the RDD will be distributed to multiple tasks for join operation in the Shuffle phase.

The process of slanting key separate join is shown in the figure below:

Application scenario analysis:

For RDD data, you can convert it to an intermediate table, or you can use countByKey() to look at the amount of data for each key in the RDD. If you find that the entire RDD has a large amount of data for a single key, then you can use this method.

When the amount of data is very large, you can use sample to obtain 10% of the data, analyze which key in the 10% of the data may cause data skew, and extract the data corresponding to this key separately.

Not applicable Scenario Analysis:

If there are too many keys in an RDD that cause data skew, then this scheme does not apply.

3. Improve reduce parallelism

If scheme 1 and Scheme 2 have no effect on data skew processing, increase the parallelism on the Reduce end during shuffle. The increase in parallelism on the Reduce end increases the number of Tasks on the Reduce end, and the amount of data allocated to each task decreases, alleviating the data skew problem.

1. Set the parallelism on the Reduce end

In most Shuffle operators, you can pass in a parallelism setting parameter, such as reduceByKey(500), which determines the parallelism of the Reduce end during the Shuffle process. During the Shuffle operation, a specified number of Reduce tasks will be created. For Spark shuffle kind of SQL statements, such as group by, join, etc., need to set a parameter, namely the Spark. SQL. Shuffle. Partitions, this parameter represents the shuffle read task parallelism, the value is the default is 200, It’s a little too small for a lot of scenes.

Increasing the number of Shuffle Read tasks allows multiple keys originally assigned to one task to be assigned to multiple tasks so that each task processes less data than the original task.

For example, if there are five keys, each of which corresponds to 10 pieces of data, and all five keys are assigned to a task, the task will process 50 pieces of data. With shuffle Read task, each task is assigned a key. That is, each task processes 10 pieces of data. Therefore, the execution time of each task is shortened.

2. Defects in parallelism setting on the Reduce end

Improving the parallelism of the Reduce end does not fundamentally change the nature and problem of data skew (Solution 1 and Solution 2 fundamentally avoid data skew), but alleviates the data pressure and data skew problem of the Shuffle Reduce task as much as possible. This method is used when there are many keys and a large amount of data.

The scheme often cannot completely solve the data skew, because if there are some extreme cases, such as a key corresponding to the amount of data has 1 million, so, no matter how much your task number increased to 1 million data corresponding to the key must be or will be assigned to a task to deal with, so the data skew is destined to happen. So this is just a way to try to find skew, to try to mitigate skew in the simplest way, or in combination with other methods.

Under ideal conditions, the increase of parallelism on the Reduce end will alleviate the problem of data skew to a certain extent, or even basically eliminate data skew. However, in some cases, tasks that originally run slowly due to data skew can only run a little faster, or some tasks can avoid OOM problems, but still run slowly. In this case, you should give up plan 3 in time and try the later plan.

4. Use Map Join

In normal cases, the Join operation performs shuffle and Reduce join. That is, all the same keys and corresponding values are gathered into a Reduce task before joining. The process of ordinary Join is shown in the figure below:

A common join is performed in the shuffle process. After shuffle, data with the same key is pulled to a Shuffle Read task for join, which is called Reduce Join. However, if an RDD is small, you can broadcast the full data of small RDD +map operator to achieve the same effect as join, that is, map Join. In this case, shuffle operation will not occur and data skews will not occur.

Note: RDD cannot be broadcast directly. Data inside RDD can only be pulled to the Driver memory through collect and then broadcast.

1. Core ideas:

Instead of using the JOIN operator, broadcast variables and map operators are used to implement the join operation. In this way, shuffle operations are completely avoided and data skews are completely avoided. The data in the small RDD is directly pulled to the memory of the Driver through the COLLECT operator, and then a broadcast variable is created. Then perform the map operator on the other RDD. In the operator function, get the full data of the smaller RDD from the broadcast variable and compare it with each data of the current RDD by connection key. If the connection key is the same, then join the data of the two RDD in the way you want.

According to the preceding approach, shuffle operation does not occur at all, fundamentally eliminating the data skew problem that may be caused by join operation.

When join operation has data skew problem and one RDD has a small amount of data, this method can be given priority and the effect is very good.

The process of Map Join is as follows:

2. Inapplicable Scenario analysis:

Spark saves a copy of the broadcast variable in each Executor. If two RDD data volumes are large, memory overflow may occur if the RDD with a large data volume is used as the broadcast variable.

troubleshooting

1. Avoid OOM out of memory

During the Shuffle process, the Reduce task does not wait until the Map task writes all its data to the disk and then pulls the data. Instead, when the Map task writes a little data, the Reduce task pulls a small amount of data and then performs subsequent operations, such as aggregation and operator function usage.

The amount of data that the reduce task can pull is determined by the buffer that the reduce task pulls data from. The data is stored in the buffer and then processed. The default size of the buffer is 48MB.

Tasks on the reduce end perform calculations while pulling data. The data may not exceed 48MB each time. In most cases, some data is pulled and then processed.

Increasing the size of the reduce buffer reduces the number of pull times and improves Shuffle performance. However, sometimes the data volume on the map end is very large and the write speed is very fast. In this case, all tasks on the reduce end may reach the maximum limit of their buffer, that is, 48MB. This, combined with the code for the aggregation function performed by the Reduce side, may create a large number of objects, which may cause memory overflow, i.e. OOM.

If memory overflow occurs on the reduce end, you can reduce the size of the pull data buffer on the reduce end, for example, to 12MB.

This problem has occurred in actual production environments, and it is a typical performance-for-execution principle. The reduce end draws data from a smaller buffer, which does not result in OOM. However, the reUDCE end draws more times, resulting in more network transmission overhead and performance deterioration.

Note that you need to ensure that the task runs, and then consider optimizing performance.

2. Avoid a shuffle file pull failure caused by GC

In Spark, a shuffle file not found error may occur. This is a very common error. After the error occurs, you need to execute it again and the error will not be displayed.

During the Shuffle operation, tasks of later stages attempt to fetch data from the Executor of tasks of earlier stages. As a result, the Executor is performing GC, which stops all work sites in the Executor. For example, BlockManager and Netty-based network communication may cause a shuffle file not found error when the task fails to pull data for a long time. This error will not occur when the task is executed again for the second time.

You can adjust the number of data pull retries on the Reduce end and the data pull interval on the Reduce end to adjust Shuffle performance. As a result, the number of data pull retries on the Reduce end increases and the waiting interval after each failure increases.

Shuffle file fetching failure caused by THE JVM GC Adjust the number of data retries and the interval for reducing data fetching.

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries"."6")
  .set("spark.shuffle.io.retryWait"."60s")
Copy the code

3. Network adapter traffic surge caused by yarn-client mode

In yarn-client mode, the Driver starts on the local machine. The Driver schedules all tasks and communicates frequently with multiple executors in the YARN cluster.

Suppose there are 100 executors and 1000 tasks, then each Executor is assigned 10 tasks. After that, the Driver frequently communicates with the 1000 tasks running on the Executor. The communication data is very high and the communication category is very high. As a result, the network adapter traffic on the local machine may surge due to frequent network communication during Spark task running.

Caution Yarn-client is only used in the test environment. The yarn-client mode is used because you can view detailed log information. By viewing the log, you can locate problems in the program and avoid faults in the production environment.

In the production environment, yarn-cluster mode must be used. In yarn-cluster mode, the network adapter traffic on the local machine does not surge. If there is a network communication problem in yarn-cluster mode, the O&M team must rectify the problem.

4. The MEMORY of the JVM stack in yarn-cluster mode overflows and cannot be executed

If the Spark job contains SparkSQL, the operation can be performed in yarn-client mode, but cannot be performed in yarn-cluster mode (an OOM error is reported).

In yarn-client mode, the Driver runs on the local machine, and the PermGen configuration of the JVM used by Spark is the spark-class file on the local machine. The size of the JVM permanent generation is 128MB. This is ok. The Driver runs on a node in the YARN cluster and uses default Settings that are not configured. The size of PermGen permanent generation is 82MB.

SparkSQL internal to perform very complex SQL semantic parsing, syntax tree conversion, etc., very complex, if the SQL statement itself is very complex, it is likely to lead to performance loss and memory consumption, especially for PermGen will be relatively large.

Therefore, if the usage of PermGen is greater than 82MB but smaller than 128MB, it can run in yarn-client mode but cannot run in yarn-cluster mode.

To increase the capacity of PermGen(permanent generation), set related parameters in the Spark-Submit script as follows:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
Copy the code

By setting the size of the Driver permanent generation to 128MB by default and up to 256MB, you can avoid the problem described above.

5. Avoid SparkSQL JVM stack memory overflow

When SparkSQL SQL statements have hundreds or thousands of OR keywords, Driver side JVM stack memory overflow can occur.

JVM stack memory overflow is basically the result of calling too many levels of methods, resulting in a large number of very deep recursions that exceed the depth limit of the JVM stack. (We guess that SparkSQL has a large number of OR statements. When parsing SQL, such as converting to syntax trees or generating execution plans, the PROCESSING of OR is recursive. A large number of OR statements will occur a large number of recursion.)

In this case, you are advised to split one SQL statement into multiple SQL statements. Ensure that each SQL statement contains less than 100 clauses. Based on actual production environment experiments, the OR keyword of an SQL statement is limited to 100 and usually does not result in JVM stack memory overflow.


More good articles on big data, welcome to pay attention to the public account [five minutes to learn big data]

–end–

Recommended article: Spark performance tuning -RDD operator tuning