In the MapReduce framework, the Shuffle phase is a bridge between Map and Reduce. The Map phase outputs data to the Reduce phase through the Shuffle process. Shuffle involves disk read/write and network I/O operations. Therefore, Shuffle performance directly affects the performance of the entire program. Spark also has the Map phase and Reduce phase. Therefore, Shuffle also occurs.

> < p style = “max-width: 100%; clear: both;

Spark Shuffle

Spark Shuffle is divided into two types: Hash Shuffle. The other is Sort based Shuffle. Shuffle:

Before Spark 1.1, Spark implements only one Shuffle mode, that is, Hash based Shuffle. In Spark 1.1, the implementation of Shuffle based on Sort is introduced. After Spark 1.2, the default implementation of Shuffle based on Hash is changed to Shuffle based on Sort. That is, the ShuffleManager used is changed from the default hash to sort. In Spark 2.0, the Hash Shuffle mode is no longer used.

Spark provides a Hash based Shuffle implementation mechanism from the very beginning. One of the main purposes of Spark is to avoid unnecessary sorting. MapReduce in Hadoop uses sort as a fixed step and has many tasks that do not require sorting. MapReduce also sorts them, causing a lot of unnecessary overhead.

In the Hash Shuffle implementation mode, each Task in the Mapper phase generates a file for each Task in the Reduce phase, which usually generates a large number of files (M*R intermediate files). M indicates the number of tasks in the Mapper phase, and R indicates the number of tasks in the Reduce phase.) A large number of random disk I/O operations and large memory overhead are required.

To alleviate the above problems, Spark 0.8.1 introduces the Shuffle Consolidate mechanism (file consolidation mechanism) for Hash Shuffle, which merges intermediate files generated on the Mapper. Through configuration properties spark. Shuffie. ConsolidateFiles = true, the decrease of the intermediate generated file number. Through file merging, you can change the generation mode of intermediate files to generate one file for each Reduce Task for each execution unit.

> The execution unit is: Number of Cores per Mapper/number of Cores per Task allocated (default: 1). Change the file number from M*R to E*C/T*R, where > E indicates the number of Executors, C indicates the number of available Cores, and T indicates the number of Cores allocated by Task.

Spark1.1 introduced Sort Shuffle:

In Hash Shuffle, the number of generated intermediate result files depends on the number of tasks in the Reduce phase, that is, the parallelism of the Reduce end. Therefore, the number of files is still uncontrollable and cannot solve the problem. To better solve the problem, Spark1.1 introduced the Sort based Shuffle implementation, and after Spark 1.2, the default implementation also changed from Hash based Shuffle, The ShuffleManager hash is changed from the default hash to Sort.

In sort-based Shuffle, tasks in each Mapper phase are written to a Data file and an Index file instead of a separate file for tasks in each Reduce phase. Tasks in the Reduce phase can obtain related data from the index file. The immediate benefit of avoiding large files is to reduce random disk I/0 and memory overhead. The number of generated files is reduced to 2 x M, where M indicates the number of Mapper tasks. Each Mapper Task generates two files (one data file and one index file). The final number of files is M data files and M index files. Therefore, the final number of files is 2*M.

Starting from Spark 1.4, the Shuffie implementation mode based on the Tungsten-sort is introduced in the Shuffle process. The optimization of the Tungstens project greatly improves Spark’s data processing performance. (Tungsten translates to Tungsten wire in Chinese)

> Note: In some specific application scenarios, the performance of Shuffle based on Hash exceeds that of Shuffle based on Sort.

Here’s the iterative history of the Spark Shuffle:

Why did Spark finally abandon HashShuffle in favor of Sorted-Based Shuffle?

The answer can be found in the most fundamental optimization and urgent problem of Spark, which uses HashShuffle to generate a large number of files during Shuffle. When a large amount of data is generated, the number of files generated is uncontrollable, which severely restricts Spark’s performance and scalability. Therefore, Spark must reduce the number of files generated by ShuffleWriter on Mapper. This allows Spark to go from hundreds of clusters to thousands or even tens of thousands of clusters in an instant.

But is it perfect to use Sorted-Based Shuffle? The answer is no. Sorted-Based Shuffle also has disadvantages, but its disadvantages are its sorting characteristics, which force data to be Sorted first in Mapper, so its sorting speed is a bit slow. Fortunately, there is the Tungsten-sort Shuffle, which improves the sorting algorithm and optimizes the sorting speed. The tungston-based Shuffle has been incorporated into the Sorted- Sort Shuffle, and Spark’s engine automatically identifies whether the program needs a Sorted-Based Shuffle or a Tungston-based Shuffle.

The underlying execution principle of each Shuffle is dissected as follows:

1. Hash Shuffle parsing

The following discussion assumes that each Executor has one CPU core.

1. HashShuffleManager

In the Shuffle write stage, data processed by each task is “divided” by key so that shuffle operators (such as reduceByKey) can be implemented in the next stage after the calculation of one stage. By “partition”, the same key is written to the same disk file by the hash algorithm, and each disk file belongs to only one task of the downstream stage. Before writing data to disk, data is written to the memory buffer. When the memory buffer is full, data is overwritten to the disk file.

How many tasks will be created for the next stage? How many disk files will be created for each task of the current stage. For example, if the next stage has a total of 100 tasks, each task of the current stage will create 100 disk files. If the current stage has 50 tasks and a total of 10 executors, each executing 5 tasks, then a total of 500 disk files will be created on each Executor. 5000 disk files will be created for all executors. Thus, the number of disk files generated by an unoptimized Shuffle Write operation is staggering.

The shuffle read phase is usually what you do at the beginning of a stage. At this point, each task of this stage needs to pull all the same keys in the calculation results of the previous stage from each node to its own node through the network, and then perform key aggregation or connection operations. During shuffle Write, map task creates a disk file for each Reduce task in the downstream stage. Therefore, during shuffle Read, Each Reduce task only needs to pull its own disk file from the node where all map tasks in the upstream stage reside.

Shuffle Read’s pull process is aggregated as it is pulled. Each Shuffle Read task has its own buffer. Each shuffle Read task can only pull data of the same size as the buffer and perform operations such as aggregation through a Map in memory. After aggregating a batch of data, pull down the next batch of data and put it into the buffer buffer for aggregation. And so on, until finally all the data to pull out, and get the final result.

The working principle of HashShuffleManager is shown in the figure below:

2. Optimized HashShuffleManager

To optimize HashShuffleManager we can set a parameter: Spark. Shuffle. ConsolidateFiles, this parameter is the default value is false, it is set to true can open the optimization mechanism, generally speaking, if we use HashShuffleManager, then recommend this option.

After the consolidate mechanism is enabled, tasks do not create a disk file for each task in the downstream stage during shuffle write. In this case, shuffleFileGroup is introduced. Each shuffleFileGroup corresponds to a batch of disk files, and the number of disk files is the same as the number of tasks in the downstream stages. An Executor can execute as many tasks in parallel as it has CPU cores. Each of the first tasks executed in parallel creates a shuffleFileGroup and writes data to the corresponding disk file.

When the Executor CPU core executes a batch of tasks and then the next batch of tasks, the next batch of tasks will reuse the existing shuffleFileGroup, including its disk files, that is, In this case, the task writes the data to the existing disk file, not to the new disk file. Therefore, consolidate allows different tasks to reuse the same batch of disk files. In this way, disk files of multiple tasks are consolidated to a certain extent, greatly reducing the number of disk files and improving shuffle Write performance.

Assuming the second stage has 100 tasks and the first stage has 50 tasks, there are still 10 executors (with 1 Executor CPU), each executing 5 tasks. The original unoptimized HashShuffleManager would have generated 500 disk files per Executor and 5,000 disk files for all executors. However, after optimization, the number of disk files created per Executor is calculated as follows: The number of CPU cores * the number of tasks at the next stage, i.e., each Executor will create 100 disk files and all executors will create 1000 disk files.

> This feature has obvious advantages, but why hasn’t Spark set it as the default in its Hash Shuffle-based implementation? Officially, it’s not stable enough.

The working principle of the optimized HashShuffleManager is shown in the figure below:

Advantages and disadvantages of the Hash Shuffle mechanism

Advantages:

– You can omit unnecessary sorting overhead.

– Avoids memory overhead for sorting.

Disadvantages:

– Too many files are produced, causing pressure on the file system.

– Random reads and writes of a large number of small files incur disk overhead.

– The cache space required for data block writing also increases, putting pressure on memory.

SortShuffle parsing

SortShuffleManager has three operating mechanisms:

1. General operation mechanism;

2. Bypass operation mechanism, when the shuffle read task less than or equal to the number of spark. Shuffle. Sort. BypassMergeThreshold when the value of the parameter (the default is 200), will enable the bypass mechanism;

3. Tungsten Sort running mechanism. To enable this mechanism, set spark.shuffle.manager= Tungsten Sort. Enabling this configuration does not guarantee that this mechanism will work (explained later).

1. Common operating mechanism

In this mode, data is first written into a memory data structure. In this case, different data structures may be selected based on the shuffle operator. If it is the reduceByKey shuffle operator of aggregation class, Map data structure will be selected and the data will be written into the memory while the aggregation is carried out through Map. If join is a common shuffle operator, Array data structures are directly written into the memory. Then, every time a piece of data is written into an in-memory data structure, it determines whether a critical threshold has been reached. If a critical threshold is reached, an attempt is made to overwrite the in-memory data structure to disk and then flush the in-memory data structure.

Before overwriting to disk files, the existing data in the memory data structure is sorted by key. After sorting, data is written to disk files in batches. The default batch number is 10000. That is, 10000 pieces of sorted data are written to disk files in batches. Writing to disk files is done through Java’s BufferedOutputStream. BufferedOutputStream is a Java BufferedOutputStream. The BufferedOutputStream first buffers data in the memory. When the memory buffer is full, data is written to the disk file again, which reduces disk I/o times and improves performance.

Multiple disk overwrites occur when a task writes all data to an in-memory data structure, resulting in multiple temporary files. Finally, all temporary disk files are merged. This process is called merge. In this process, data from all temporary disk files is read and written to the final disk file. In addition, since a task corresponds to only one disk file, which means that the data prepared by this task for tasks of the downstream stage are all in this file, an index file will be written separately. Start offset and end offset of each downstream task are identified in the file.

SortShuffleManager greatly reduces the number of files because it has a disk file merge process. For example, the first stage has 50 tasks and a total of 10 executors, each executing 5 tasks, while the second stage has 100 tasks. Because each task ends up with only one disk file, there are only five disk files per Executor and 50 disk files for all executors at this point.

The working principle of SortShuffleManager with common operation mechanism is shown as follows:

2. Bypass operation mechanism

When the number of tasks on the Reducer is small, the implementation mechanism based on Hash Shuffle is obviously faster than that based on Sort Shuffle. Therefore, the implementation mechanism based on Sort Huffle provides a rollback scheme, namely the bypass operation mechanism. For Reducer end task under configuration properties spark. Shuffle. Sort. BypassMergeThreshold set number, use Hash style fallback plan.

Triggering conditions of the bypass mechanism are as follows:

– shuffle map task number is less than the spark. Shuffle. Sort. BypassMergeThreshold = 200 the value of the parameter. – Is not the shuffle operator of the aggregation class.

In this case, each task creates a temporary disk file for each downstream task, hash the data based on the key, and write the key into the disk file based on the hash value of the key. Of course, a disk file is written to the memory buffer first, and then overwrites to the disk file when the buffer is full. Finally, all temporary disk files are also merged into a single disk file and a single index file is created.

The disk write mechanism of this process is exactly the same as that of the unoptimized HashShuffleManager, in that a staggering number of disk files are created, only a disk file merge is done at the end. Therefore, the small number of final disk files also makes shuffle Read perform better than the unoptimized HashShuffleManager.

The difference between this mechanism and ordinary SortShuffleManager operation mechanism lies in: first, the disk write mechanism is different; Second, it doesn’t sort. In other words, the biggest advantage of this mechanism is that data sorting is not required during Shuffle Write, thus reducing the performance overhead.

The SortShuffleManager of the bypass operation mechanism works as shown in the following figure:

3. Tungsten Sort Shuffle running mechanism

The implementation mechanism of Shuffle based on Tungsten Sort mainly relies on the optimization made by Tungsten project to efficiently process Shuffle.

Spark provides configuration attributes for selecting the Shuffle implementation mechanism. However, it is important to note that the SortShuff-based implementation mechanism is enabled by default on Spark. Referring to the kernel part of Shuffle framework, it can be seen that both the SortShuffle based implementation mechanism and the Tungsten SortShuffle based implementation mechanism use SortShuffleManager, and the specific implementation mechanism used internally. Is determined by the two methods provided:

For non-tungsten Sort, passSortShuffleWriter.shouldBypassMergeSortMethod to determine whether to revert to the Hash Shuffle implementation mechanism. If the conditions returned by this method are not met, the Shuffle implementation mechanism passesSortShuffleManager.canUseSerializedShuffleMethod To determine whether the Tungsten Sort Shuffle implementation mechanism needs to be adopted. When both methods return false, that is, neither of the corresponding conditions is met, the normal operation mechanism will be automatically adopted.

Therefore, when spark.shuffle.manager=tungsten-sort is set, it is not guaranteed that the tungsten sort based shuffle implementation mechanism will be used.

The following conditions must be met to implement the Tungsten Sort Shuffle mechanism:

1. The Shuffle dependency does not contain the aggregation operation or the requirement for sorting output.

2. The Shuffle serializer supports relocation of serialized values. Currently, only the serializer customized by the KryoSerializer Spark SQL framework is supported.

3. The number of output partitions in the Shuffle process is less than 16777216.

In practice, there are other limitations, such as the size of an internal single record cannot exceed 128 MB with the introduction of a page-style memory management model (see the PackedRecordPointer class for a memory model). In addition, the number of partitions is limited by this memory model.

Therefore, the current implementation mechanism based on Tungsten Sort Shuffle is still relatively strict.

Advantages and disadvantages of sort-based Shuffle mechanism

Advantages:

– The number of small files is greatly reduced, reducing Mapper memory usage.

– Spark not only processes small-scale data, but also does not easily reach the performance bottleneck even when processing large-scale data.

Disadvantages:

– If the number of tasks in Mapper is too large, a large number of small files will still be generated. In this case, the Reducer needs a large number of records to deserialize at the same time, which results in a large memory consumption and a huge GC burden, resulting in a slow system. Even collapse;

– Enforces sorting on Mapper even if the data itself does not need to be sorted

– It sorts Based on the records themselves, which is the most deadly performance drain of sort-based Shuffle.


References:

– Spark Big Data Business Trilogy

– spark.apache.org/docs/2.0.0/…

– mp.weixin.qq.com/s/2yT4QGIc7…