This is the 19th day of my participation in the Genwen Challenge

Shuffle Tuning 1: Adjust the size of the map buffer

During Spark tasks, if the map side of the Shuffle processes a large amount of data, but the size of the map side buffer is fixed, the map side buffer data may be frequently spilled to disk files, resulting in poor performance. Adjust the size of the Map side buffer. This prevents frequent DISK I/O operations and improves the overall performance of Spark tasks.

The default configuration of map buffer is 32KB. If each task processes 640KB of data, 640/32 =20 write overflows occur. If each task processes 64000KB of data, 64000/32=2000 write overflows occur, which has a serious impact on performance. The map-side buffer is configured as shown in the code listing:

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer", "64")
Copy the code

Shuffle Tuning 2: Adjust the size of the pull data buffer on the Reduce end

During Spark Shuffle, the size of the Shuffle Reduce Task’s buffer determines the amount of data that the Reduce Task can buffer, that is, the amount of data that can be pulled at a time. If the memory resources are sufficient, increase the size of the pull data buffer. You can reduce the number of times to pull data, which can reduce the number of network transmission, thereby improving performance.

Reduce end data pull the size of the buffer can spark. Reducer. MaxSizeInFlight parameters setting, the default of 48 MB, the parameter setting method as shown in code listing:

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "96")
Copy the code

Shuffle Tuning 3: Adjust the number of pull data retries on the Reduce end

During Spark Shuffle, if the Reduce task fails to fetch its own data due to network exceptions, the task automatically tries again. For jobs that involve particularly time-consuming shuffle operations, it is recommended to increase the maximum number of retries (say, 60) to avoid pull failures due to factors such as the JVM’s full GC or network instability. In practice, it is found that adjusting this parameter can greatly improve the stability of the shuffle process with a large amount of data (billions to tens of billions).

Reduce side pull data retries can spark. Shuffle. IO. MaxRetries parameters Settings, this parameter represents the can retry the maximum times. If the pull fails within the specified number of times, the job may fail. The default is 3. This parameter is set as shown in the code listing:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")
Copy the code

Shuffle Tuning 4: Adjust the data waiting interval on the Reduce end

During Spark Shuffle, if the Reduce task fails to fetch its own data due to a network exception, the reduce task automatically tries again. After a failure, the Reduce task tries again after a certain interval (for example, 60 seconds). To improve shuffle operation stability.

Reduce side pull data wait interval can spark. Shuffle. IO. RetryWait parameters setting, the default value is 5 s, the parameter setting method as shown in code listing:

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait", "60s")
Copy the code

Shuffle Tuning 5: Adjust the sort operation threshold of Shuffle

For SortShuffleManager, if the number of Shuffle Reduce tasks is less than a certain threshold, data is written in the unoptimized HashShuffleManager mode instead of sorting during Shuffle Write. But eventually all the temporary disk files produced by each task are merged into one file and a separate index file is created.

When you use SortShuffleManager, if sorting is not required, you are advised to set this parameter to a value greater than the number of shuffle read tasks. In this case, map-side sorting will not be performed, reducing the sorting overhead. A large number of disk files are still generated. Therefore, shuffle Write performance needs to be improved.

SortShuffleManager Sort operation threshold can be set using spark.shufflesor. bypassMergeThreshold. The default value is 200.

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold", "400")
Copy the code