preface
1. Shuffle principle Analysis
1.1 summary of shuffle
Shuffle is a reorganization of data. Due to the features and requirements of distributed computing, the implementation details are more complicated.
In the MapReduce framework, Shuffle serves as a bridge between Map and Reduce. In the Map phase, Shuffle reads data and outputs data to the corresponding Reduce. In the Reduce phase, data is extracted from the Map and calculated. A large amount of disk and network I/O is involved in the shuffle process. Therefore, the performance of shuffle directly determines the performance of the entire program. Spark will also have its own shuffle implementation.
1.2 Introduction to Shuffle in Spark
In the DAG scheduling process, stages are divided according to whether there is shuffle process. That is, shuffle is needed when there is wide dependency. In this case, jobs are divided into multiple stages, each of which contains many tasks that can be run in parallel.
The process between stages is the Shuffle phase. In Spark, the ShuffleManager is responsible for executing, calculating, and processing the shuffle process. ShuffleManager With the development of Spark, there are two ways to implement ShuffleManager: HashShuffleManager and SortShuffleManager. Therefore, Spark Shuffle has Hash Shuffle and Sort Shuffle.
1.3 HashShuffle mechanism
1.3.1 Introduction to HashShuffle
Before Spark 1.2, the default Shuffle computing engine was HashShuffleManager.
HashShuffleManager has a very serious disadvantage of creating a large number of intermediate disk files, which in turn affects performance with a large number of disk I/O operations. Therefore, in later versions of Spark 1.2, the default ShuffleManager is changed to SortShuffleManager.
SortShuffleManager has some improvement over HashShuffleManager. During shuffle operation, many temporary disk files are generated for each Task. However, all temporary files are merged into one disk file. Therefore, each Task has only one disk file. When the shuffle Read task at the next Stage pulls its own data, it simply reads part of the data in each disk file based on the index.
Hash Shuffle is a shuffle without sorting.
1.3.2 Hash Shuffle using a common mechanism
The operation mechanism of HashShuffleManager is mainly divided into two types: one is ordinary operation mechanism, the other is merge operation mechanism, and the merge mechanism mainly optimizes the number of small files generated in Shuffle process through reuse of buffer.
First, a brief explanation of the situation. At this point, the task is divided into two stages. The first Stage has 4 MapTasks at the top of the Stage, while the second Stage has 3 reducetasks. However, if the number of mapTasks increases to 1000, The number of block files generated by MapTask*3 is as large as that of MapTask*3, where a large number of IO operations can cause significant performance problems
1.3.3 Hash Shuffle procedure description
This assumes that each Executor has only one CPU core, which means that no matter how many task threads are allocated to that Executor, only one task thread can be executed at a time.
There are 3 reducetasks in the figure, starting from ShuffleMapTask, each of them Hash their own (partitioner: Each ShuffleMapTask is divided into three types of data and wants to aggregate different data and calculate the final result, so ReduceTask will collect data belonging to its own category and aggregate it into a large collection of the same category. Every ShuffleMapTask outputs 3 local files, there are 4 ShufflemapTasks, so the total output is 4 x 3 classification files = 12 local small files.
Shuffle Write phase:
After the calculation of one stage, the data processed by each task can be partitioned according to key so that the next stage can implement shuffle operators (such as reduceByKey and groupByKey). Partitioning means that the hash algorithm is executed on the same key to write the same key into the same disk file. Each disk file only belongs to a task of the stage on the Reduce side. Before writing data to disk, data is written to the memory buffer. When the memory buffer is full, data is overwritten to the disk file.
How many disk files are created for the next Stage for each Task that performs Shuffle Write? Each task of the current stage will create as many disk files as there are tasks of the next stage. For example, if the next stage has a total of 100 tasks, each task of the current stage will create 100 disk files. If the current stage has 50 tasks, a total of 10 executors, and each Executor executes 5 tasks, then a total of 500 disk files will be created on each Executor, and 5000 disk files will be created on all executors. Thus, the number of disk files generated by an unoptimized Shuffle Write operation is staggering.
Shuffle Read phase:
Shuffle Read is usually what a stage does when it starts. At this point, each task of this stage needs to pull all the same keys in the calculation results of the previous stage from each node to its own node through the network, and then perform key aggregation or connection operations. During shuffle write, each task creates a disk file for each task of the stages on the Reduce end. Therefore, during shuffle Read, each task obtains a disk file from all tasks of the upstream stages. Pull that disk file that belongs to oneself can.
Shuffle Read’s pull process is aggregated as it is pulled. Each Shuffle Read task has its own buffer. Each shuffle Read task can only pull data of the same size as the buffer and perform operations such as aggregation through a Map in memory. After aggregating a batch of data, pull down the next batch of data and put it into the buffer buffer for aggregation. And so on, until finally all the data to pull out, and get the final result.
Note:
-
The buffer serves as a buffer to speed up disk writing and improve computing efficiency. The default size of a buffer is 32 KB.
-
Partitioner: According to the hash/numRedcue model, the data is processed by several Reduce, and also determines how many buffers are written
-
Block file: indicates a small disk file. The formula for calculating the number of small disk files is as follows: Block file=M*R. M is the number of Map tasks and R is the number of Reduce tasks. Generally, the number of Reduce tasks is equal to the number of buffers, which are determined by the divider
Hash Shuffle common mechanism problems:
-
In the Shuffle phase, a large number of small files are generated on disks. As a result, the number of times for establishing communication and pulling data increases, a large number of time-consuming and inefficient I/O operations are generated (because too many small files are generated).
-
A large number of time-consuming and inefficient I/O operations may result in OOM. As a result, there are too many objects in disk write and disk read. These objects are stored in the heap memory, resulting in insufficient heap memory and frequent GC, which leads to OOM. Because the memory needs to store a large number of file operation handles and temporary information, if the scale of data processing is relatively large, the memory can not bear, and there will be OOM problems
1.3.4 Hash Shuffle of the Merge mechanism
Merging mechanism is to reuse the buffer buffer, open the consolidation mechanism of configuration is spark shuffle. ConsolidateFiles. The default value of this parameter is false. Set it to true to enable the optimization mechanism. In general, if we use HashShuffleManager, it is recommended that this option be turned on.
There are six shuffleMapTasks here, and the data categories are still divided into three types, because the Hash algorithm will sort by your Key, and it will put the same Key in the same Buffer no matter how many tasks there are in the same process, Then write the Buffer data into the local file in unit of the number of Core (a Core has only one type of Key data). Each Task in the process is written into three local files in the common process. There are 6 shuffleMapTasks. So the total output is 2 Cores x 3 classified files = 6 local small files.
In this case, block file = Core * R, Core is the number of CPU cores, and R is the number of Reduce. However, if the Reducer side has too many parallel tasks or data fragments, the Core * Reducer Task is still too large and many small files will be generated.
1.4 Sort shuffle
SortShuffleManager can also be divided into two running mechanisms: ordinary running mechanism and bypass running mechanism
1.4.1 General mechanism of Sort Shuffle
In this mode, data is first written to a data structure, and the aggregation operator writes to a Map. At the same time, data is locally aggregated by the Map and written to the memory. The Join operator writes the ArrayList directly to memory. Then you need to check whether the threshold (5M) is reached. If the threshold is reached, the memory data structure is written to disk and the memory data structure is cleared.
Before overwriting the disk, sort the sorted data by key. The sorted data is written to the disk file in batches. The default batch is 10,000, and data is written to disk files in batches of 10,000. Write a disk file using buffer overwrite. Each overwrite generates a disk file. That is, a task process generates multiple temporary files.
Finally, in each task, all temporary files are merged. This process is called merge. This process reads all temporary files and writes them to the final file at once. This means that all the data for a task is in one file. At the same time, write a separate index file that identifies the start offset and end offset indexes of the downstream tasks in the file (for example, for wordCount, start offset to end offset is the word).
The benefits of this mechanism:
- There are significantly fewer small files, with only one file generated per task
- The overall order of file files, with the help of index files, makes the search faster. Although sorting wastes some performance, the search is much faster
1.4.2 sortShuffle in bypass mode
Bypass mechanism operating conditions is to shuffle the map task number is less than the spark. Shuffle. Sort. BypassMergeThreshold parameter value (the default value of 200), and not aggregated class shuffle operators (such as reduceByKey)
When the number of ShuffleMapTasks is less than the default value 200, sortShuffle in bypass mode is enabled and sort is not implemented. The reason is that the data amount is small and there is no need to perform full sort because the query speed is fast because the data amount is small. Just saves on the performance overhead of sort.
1.5 Parameters used
1.5.1 spark. Shuffle. The file. The buffer
The default buffer size is 32 KB. You can adjust this value to reduce the number of disk overwrites. Reducing disk I/O
1.5.2 spark. Reducer. MaxSizeFlight
ReduceTask Size of the data to be pulled. The default value is 48 MB
1.5.3 spark. Shuffle. MemoryFraction
Shuffle Indicates the proportion of aggregated memory that occupies executor memory
1.5.4 spark. Shuffle. IO. MaxRetries
Retry times of pulling data to prevent network jitter
1.5.5 spark. Shuffle. IO. RetryWait
Adjust the retry interval to determine how long it takes to pull again after a pull failure
1.5.6 spark. Shuffle. ConsolidateFiles
For the HashShuffle merge mechanism
1.5.7 spark. Shuffle. Sort. BypassMergeThreshold
SortShuffle bypass. The default value is 200
1.5.8 spark. SQL. Shuffle. Partitions
The default partition number is 200, which is the number of partitions used for shuffle. This is the number of partitions used for shuffle
finally
…