Learn more about Java basics


Continue with Spark Learning – Performance Tuning part 1

Reduce the memory usage of cache operations

On the use of RDD memory, there will be a special article written later, you can learn to read

In Spark, the heap memory is divided into two parts. One part is used to cache RDD data for RDD cache and PERSIST operations. The other part, which is used to run the Spark operator function, holds the objects created by the function itself.

By default, the percentage of memory allocated to RDD cache operations is 0.6, 60% of which is allocated to cache operations. The problem, however, is that in some cases, the cache is not so tight. The problem is that too many objects are created in the Task operator function, and the memory is not large enough, which leads to frequent minor gc, or even frequent Full GC, and Spark stops working frequently. The performance impact can be significant.

In this case, you can view the operation statistics of your Spark job on the SPARK UI. If you want to run the spark job using YARN, you can view the operation statistics of your Spark job on the Yarn UI. You can see how each stage is running, including the running time of each task, gc time, and so on. If gc is found to be too frequent for too long. At this point, you can adjust the price appropriately.

Reduce the memory ratio of cache operations. If persist is the worst, write some cached RDD data to disk or serialize it with the Kryo serialization class to reduce the memory usage of THE RDD cache. Reduce the memory ratio of cache operations. Accordingly, the memory ratio of the operator function increases. At this point, it is possible to reduce the frequency of minor gc and full GC at the same time. It is helpful to improve performance.

In short, more memory is available when making tasks perform operator functions.

SparkConf conf = new SparkConf()
  .set("spark.storage.memoryFraction"."0.5")
Copy the code

Adjust out-of-executor heap memory

An executor may run out of memory due to insufficient out-of-heap memory. The shuffle map output file may be pulled from some executors, but the executor may have been suspended, and the associated block manager is missing. Shuffle output file not found; Resubmitting task; The executor is lost; The Spark job crashes completely.

In those cases, consider tuning executor’s out-of-heap memory. Maybe you can avoid reporting errors; In addition, there are times when the off-heap memory tuning is relatively large, which can lead to some improvement in performance.

How do I adjust out-of-executor heap memory

--conf spark.yarn.executor.memoryOverhead=2048
Copy the code

In the spark-submit script, add the configuration using the –conf method. Be careful!! Remember, not in your Spark job code, use new SparkConf().set(). Don’t set it like this, it won’t work! Be sure to do this in the Spark-Submit script.

Spark. Yarn. Executor. MemoryOverhead (see the name, just as its name implies, is for submission model based on the yarn)

By default, this out-of-heap memory limit is about 300 megabytes; In most projects, when we are really dealing with big data, there will be problems here, causing spark job to crash repeatedly and not run. At this point, the parameter will be adjusted to at least 1G (1024M), or even 2G, 4G

Generally, this parameter can be adjusted to avoid some JVM OOM exceptions and improve spark job performance.

Example Adjust the connection waiting time

Encounter a situation, occasionally, occasionally, occasionally!! No rules!! Such and such a file. A string of file ids. Uuid (DSFSFD-2342VS — SDF — SDFSD) Not found. The file is lost.

In this case, it is likely that the executor has that data in the JVM GC. So when you pull data, you can’t make a connection. If the default value exceeds 60 seconds, the system fails.

If an error message is displayed for several times and the data cannot be retrieved for several times, the Spark job may crash. It can also cause DAGScheduler to commit stages several times. A task scheduler that repeatedly submits tasks. Greatly increases the running time of our Spark job.

Consider adjusting the connection timeout period.

--conf spark.core.connection.ack.wait.timeout=300
Copy the code

The spark-submit script, remember, is not set in new SparkConf().set().

Merge map-side output files

Actual production environment conditions:















How many map-side output files are there? 100 times 10,000 = 1 million.

  • Disk writing operations on the Shuffle consume the most performance on the Shuffle. According to the above analysis, a shuffle session of a spark Job in a common production environment writes 1 million files to disks. The impact of disk IO on performance and the speed of Spark job execution is staggering and frightening. Basically, the performance of spark jobs is consumed in Shuffle. Although not only the output files on the Map side of shuffle, it is also a significant performance consumption.

Enable the merging mechanism for output files on the Map end

new SparkConf().set("spark.shuffle.consolidateFiles"."true")
Copy the code

As shown above:

  • In the first stage, all CPU core tasks are executed simultaneously. For example, two CPU core tasks are executed in parallel. Each task creates a number of files for the task of the next stage.
  • Stage 1: After the execution of two tasks running in parallel; The other two tasks are executed; The other two tasks do not recreate the output file; Instead, map-side output files created by previous tasks are reused and data is written to the output files of the previous batch of tasks.
  • In the second stage, when tasks pull data, they will not pull the output file created by each task in the previous stage. Instead, it pulls a small number of output files. Each output file may contain multiple tasks output to its map side.

How would the above example change if the map-side output files were merged?

  1. The number of I/OS that map Task writes to disk files is reduced: 1 million files -> 200,000 files
  2. In the second stage, 1000 tasks were originally to be pulled from the first stage. Each task in the second stage was to pull 1000 files for network transmission. After the merge, 100 nodes, each node 2 CPU core, each task of the second stage, mainly pull 100 * 2 = 200 files; The performance cost of network transmission is also greatly reduced

One caveat (map-side output file merging) : only parallel tasks create new output files; The next batch of parallel tasks will reuse the existing output files. However, there is an exception. For example, two tasks are being executed in parallel, but two tasks need to be executed. At this point, the output file created by the previous 2 tasks cannot be reused. Instead, you have to create new output files.

To achieve the effect of combining output files, a batch of tasks must be executed first and then the next batch of tasks can be executed before the previous output files can be reused. Being responsible for executing multiple batches of tasks at the same time is still not reusable.

Adjust the memory cache ratio on the Map end and memory ratio on the Reduce end

If the task on the map side processes a large amount of data, however, your memory buffer size is fixed. What might happen?

Each task processes 320KB, 32KB, and overwrites to disk 320/32 = 10 times in total. Each task processes 32000 KB and 32 KB, and a total of 32000/32 = 1000 times of overwrite data to the disk.

If the map Task processes a large amount of data, your task’s memory buffer is small by default, 32KB. Multiple map file spill write operations may occur, causing a large number of DISK I/OS, which degrades the disk performance.

Memory aggregated on the Reduce end. The default is 0.2. If the amount of data is large and the Reduce Task pulls a large amount of data, the aggregate memory on the Reduce end is insufficient, and the data is spilled to disks frequently. Moreover, if the amount of data overwritten on the disk is larger, the data on the disk may be read and aggregated for several times. The default value is not optimized. When a large amount of data is generated, disk files on the Reduce end may be read and written frequently.

These two points are put together because they are related. As the amount of data increases, some problems will definitely occur on the map side. There must be some problems on the Reduce side; The problem is the same: Disk I/OS become frequent, affecting the performance.

How to tune

New SparkConf().set()"spark.shuffle.file.buffer"."64") The default is 32K (spark 1.3.x is not this parameter and has a suffix, KB; Set (SparkConf()) : new SparkConf().set()"spark.shuffle.memoryFraction"."0.3"The default 0.2)Copy the code

When do we adjust the two parameters in a real production environment?

Looking at the Spark UI, if your company has decided to standalone mode, it’s simple. Your Spark runs, displays the address of the Spark UI, port 4040, go inside, click on it, and you can see the details of each stage, which executors are available, Shuffle write and Shuffle read amount of each task, shuffle disk and memory, and shuffle read and write amount of data. If the submission is done in YARN mode, go to the YARN screen and click Application to go to the Spark UI to view details.

If the write and read values of the Shuffle disk are large. At this point, it’s best to adjust some shuffle parameters. Tune. First, of course, consider enabling the map-side output file merge mechanism.

Adjust the above two parameters. The principle of adjustment. Spark. Shuffle. The file buffer, doubling every time, and then see the effect, 64128; Spark. Shuffle. MemoryFraction, every time increased by 0.1, see the effect.

Can’t adjust too much, too much after too much of a good thing, because memory resources are limited, you adjust too much here, other links will have a problem with memory use.

SortShuffleManager tuning

// Threshold setting new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold"."550")
Copy the code
  • After Spark 1.5.x, there is a new type of Shuffle Manager, tungsten-sort (tungsten), tungsten sort Shuffle Manager. Tungsten wire Sort Shuffle Manager has the same effect as Sort Shuffle Manager. However, the only difference is that tungsten Manager uses a set of memory management mechanism implemented by itself, which greatly improves performance and avoids a large number of OOM, GC, and other memory-related exceptions during shuffle.

To sum up, this is a bit more about spark’s Shuffle. You understand it a little bit better. Hash, sort, tungsten-sort. How to choose?

  1. Do you need the default data to have Spark sort for you? Like MapReduce, there is sorting by key by default. If you don’t need to use HashShuffleManager, you are advised to build with the basic HashShuffleManager, because the initial consideration is not sorting, in exchange for high performance;

  2. When should I use Sort Shuffle Manager? If you need your data sorted by key, use this option, and be aware that the number of reduce tasks should be more than 200 for sort/merge to work. Note, however, that you must consider whether you need to perform this task during shuffle because it affects performance.

  3. If you don’t need to sort, and you want the output of each of your tasks to end up in a single file, you can reduce the performance overhead. You can adjust the ·bypassMergeThreshold· threshold, for example, if you have 500 Reduce tasks and the default threshold is 200, sort and merge will still be done by default. You can set the threshold to 550 without sort and create an output file for each Reduce task according to hash and merge it into one file. (It is important to note that this parameter is not usually used in production environments and has not been proven to improve performance.)

  4. If you want to use sort Based Shuffle Manager and your company has a higher version of Spark (1.5.x), consider using tungsten-Sort Shuffle Manager. Look at the performance improvements and stability.

Conclusion:

  1. In a production environment, it is not recommended to use points 3 and 4 hastily:
  2. If you don’t want your data sorted on shuffle, set it up yourself and use hash Shuffle Manager.
  3. If you really need your data sorted on shuffle, then by default, sort Shuffle Manager is used. Or what? If you don’t care about sorting at all, default to sort. Adjust some other parameters (consolidation mechanism). (80% of them are used)
The spark. Shuffle. Manager:hash, sort, tungsten-sort new SparkConf().set("spark.shuffle.manager"."hash")
new SparkConf().set("spark.shuffle.manager"."tungsten-sort"// Default is new SparkConf().set()"spark.shuffle.manager"."sort")

Copy the code