Sharer: Chen Shi, technical expert of EMR team of Alibaba Computing Platform Business Division, currently engaged in big data storage and Spark related work.
- The Spark Shuffle is introduced
- Smart Shuffle design
- Performance analysis
The Spark Shuffle process
Spark 0.8 and earlier Hash Based ShuffleSpark 0.8.1 introduce File Consolidation mechanism Spark 0.9 for Hash Based Shuffle Introducing ExternalAppendOnlyMapSpark 1.1 introduced the Sort -based Shuffle, The default Shuffle mode is Hash Based ShuffleSpark 1.2 The default Shuffle mode is changed to Sort Based ShuffleSpark 1.4 Introducing Tungsten-Sort Based ShuffleSpark 1.6 Tungsten sort merges into sort Based ShuffleSpark 2.0 Hash Based Shuffle
To sum up, Hash Based Shuffle is used at the beginning. At this time, each Mapper will create corresponding buckets Based on the Reducer number. The number of buckets is M x R, where M is the number of maps and R is the number of Reduce. This results in a large number of small files, which puts a lot of strain on the file system and is bad for IO throughput. When I had to, I optimized it to merge the output of multiple Mapper files running on the same core into one file, so that the number of files became cores x R.
The Spark Shuffle implementation
Sort – -based shuffle is introduced
The choice of the way is in the org. Apache. Spark. SparkEnv completed:
// Let the user specify short names forshuffle managers val shortShuffleMgrNames = Map( "hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager", "sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName =conf.get("spark.shuffle.manager", "Sort ") // Get the Shuffle Manager type, Sort for the default val shuffleMgrClass = shortShuffleMgrNames. GetOrElse (shuffleMgrName toLowerCase, shuffleMgrName) val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)Copy the code
Each mapper of Hashbased shuffle needs to write a file for each reducer to be read by the reducer, that is, M x R files need to be generated. If the number of mapper and reducer is large, the number of files generated will be very large. One of the goals of Hash Based shuffle design is to avoid unnecessary sorting (Hadoop Map Reduce is criticized for causing unnecessary overhead in many places where sort is not needed). However, it generates a lot of DiskIO and memory consumption when processing very large data sets, which undoubtedly affects performance. Hash Based shuffle is also under constant optimization. As mentioned above, File consolidation introduced by Spark 0.8.1 solves this problem to a certain extent. To solve this problem, Spark 1.1 introduces Sort Based Shuffle. First, each Shuffle Map Task does not generate a separate file for each Reducer. Instead, it writes all the results into one file and generates an index file that the Reducer can use to get the data it needs to process. The immediate benefits of avoiding large numbers of files are reduced memory usage and the low latency associated with sequential Disk IO. Saving memory usage reduces the risk and frequency of GC. Reducing the number of files can avoid the stress of writing multiple files at the same time.
The writer’s implementation is divided into three kinds, divided into BypassMergeSortShuffleWriter SortShuffleWriter and UnsafeShuffleWriter.
SortShuffleManager only has BlockStoreShuffleReader.
Problems of Spark-Shuffle
Synchronous operation
Shuffle Data Multipath merging may be triggered to generate final data only after the Map task is complete.
A lot of disk IO
Shuffle data has a large number of disk READ/write I/OS in the Merge phase, which requires high disk I/O bandwidth in the sort-Merge phase.
Compute serial with network
Task Compute tasks and serial operations of network I/OS.
Smart Shuffle
Shuffle data pipeline
Shuffle Data is accumulated on the Map end and sent to the Reduce end.
Avoid unnecessary network IO
Based on the number of partitions, you can schedule reduce tasks to corresponding nodes.
Asynchronization of computing and network IO
Shuffle data generation and shuffle data sending can be performed in parallel.
Avoid sort-merge reducing DISK I/OS
Shuffle data is partitioned according to partition. Sort-merge is not required for shuffle data
Smart Shuffle using
- Configuration spark. Shuffle. Manager: org. Apache. Spark. Shuffle. Hash. HashShuffleManager
- Configuration spark. Shuffle. Smart. Spill. MemorySizeForceSpillThreshold: control the size of shuffle data memory, the default is 128 m
- Configure the spark. Shuffle. Smart. Transfer. BlockSize: control shuffle in the size of the network transmission data block
Performance analysis
Hardware and software resources:
TPC – DS performance:
Smart Shuffle TPC-DS improves performance by 28% :
- Smart Shuffle has no single query performance degradation
- A single Query can provide up to a 2x performance improvement
Extract Q2 and Q49 query performance analysis:
- Q2 has the same performance in the two shuffles
- The performance of Q49 is greatly improved under Smart Shuffle
Single query comparison:
Sorted Shuffle is on the left and Smart Shuffle is on the right. Q2 query is relatively simple and shuffle data is small. The performance of Smart Shuffle remains unchanged.
Q2 CPU comparison: Sorted Shuffle is on the left, and Smart Shuffle is on the right
Disk comparison:
Sorted Shuffle on the left and Smart Shuffle on the right
Statement: all articles in this number are original, except for special notes, public readers have the right to read first, shall not be reproduced without the permission of the author, or tort liability.
Pay attention to my public number, background reply [JAVAPDF] get 200 pages of questions! 50000 people pay attention to the big data into the way of god, don’t you want to know? Fifty thousand people pay attention to the big data into the road of god, really not to understand it? Fifty thousand people pay attention to the big data into the way of god, sure really not to understand it?