This article describes how sort-Shuffle helps Flink become more adept at handling large batch data processing tasks. The main contents include:
- Introduction to Data Shuffle
- Introducing the significance of sort-shuffle
- Flink Sort – Shuffle
- The test results
- Tuning parameters
- future
Flink is a big data computing engine integrating batch and stream. Large-scale batch data processing is also an important part of Flink’s data processing capability. With the iteration of Flink’s version, its batch data processing capability is also continuously enhanced. The introduction of sort-Shuffle makes Flink more capable of handling large-scale batch data processing tasks.
I. Introduction to data Shuffle
Data shuffle is an important phase of batch data processing. In this phase, the output data of upstream processing nodes is persisted to external storage, and then read and processed by downstream compute nodes. This persistent data is not only a form of data exchange between compute nodes, but also plays an important role in error recovery.
At present, there are two shuffle models of batch data adopted by existing large-scale distributed computing systems, which are hash based and SORT based:
-
The core idea of the Hash approach is to write data sent to different downstream concurrent consumption tasks into a separate file, so that the file itself becomes a natural boundary between different data partitions.
-
The core idea of sort is to write all partitions together and then sort to distinguish the boundaries of different data partitions.
We introduced the sort-based batch shuffle implementation into Flink in Flink 1.12 and continued performance and stability optimization in the subsequent years. By version 1.13 of Flink, sort-Shuffle was already production-available.
2. The significance of introducing sort-shuffle
An important reason for introducing the sort-shuffle implementation in Flink is that Flink’s original hash based implementation is not available for large batch jobs. This is also demonstrated by other existing large-scale distributed computing systems:
-
** Stability: ** For high-concurrent batch jobs, a hash based implementation produces a large number of files that are concurrently read and written to, which consumes a lot of resources and puts a lot of strain on the file system. File systems need to maintain a large amount of file metadata, resulting in unstable risks such as file handles and inode exhaustion.
-
** Performance: ** For high concurrent batch jobs, reading and writing a large number of files concurrently means a large number of random IO, and the actual amount of data read and written per IO may be very small, which is a huge challenge for I/O performance. On mechanical hard disks, this makes data shuffle easy to become a performance bottleneck for batch jobs.
By introducing the batch data shuffle implementation based on SORT, the number of concurrent read and write files can be greatly reduced, which is conducive to better sequential read and write data, thus improving the stability and performance of Flink large-scale batch processing. In addition, the new sort-shuffle implementation also reduces memory buffer consumption. For hash based implementations, each data partition requires a read/write buffer, and memory buffer consumption is proportional to concurrency. A sort-based implementation, on the other hand, can decouple memory buffer consumption and jobs (although more memory may lead to better performance).
More importantly, we have implemented new storage structures and read/write IO optimizations that give Flink’s batch shuffle an advantage over other large-scale distributed data processing systems. The following sections describe Flink’s sort-shuffle implementation and its results in more detail.
Flink sort-shuffle implementation
Similar to the batch data sort-shuffle implementation of other distributed systems, the whole shuffle process of Flink is divided into several important stages. It includes writing data to the memory buffer, sorting the memory buffer, writing the sorted data to a file, and reading shuffle data from the file and sending it downstream. However, compared to other systems, Flink implementation has some fundamental differences, including multi-segment data storage format, eliminating data merge process, and data read IO scheduling. All of these make Flink’s implementation better.
1. Design objectives
Throughout the implementation of Flink sort-Shuffle, we considered the following points as the main design objectives:
1.1 Reduce file quantity
As discussed above, a hash based implementation produces a large number of files, and reducing the number of files improves stability and performance. Sort-spit-merge is widely used in distributed computing systems to achieve this goal. The data is first written to the memory buffer, and when the memory buffer is filled, the data is sorted, and the sorted data is written to a file. The total number of files is: (total data/memory buffer size), so the number of files is reduced. When all the data is written out, the resulting files are combined into a single file, further reducing the number of files and increasing the size of each data partition (to facilitate sequential reads).
Compared with the implementation of other systems, Flink has an important difference, that is, Flink always appoints data to the same file, instead of writing multiple files and then merging them. This benefit is always only one file, and the number of files is minimized.
1.2 Open fewer files
Too many open files consume more resources and may cause insufficient file handles, resulting in poor stability. Therefore, having fewer files open improves system stability. For data writing, as described above, only one file is always open per concurrent task by always appending data to the same file. For data reads, although each file needs to be read by a large number of downstream concurrent tasks, Flink still achieves the goal of opening each file only once by opening the file only once and sharing file handles between these concurrent read tasks.
1.3 Maximizing sequential read and write
Sequential file reads and writes are critical to file I/O performance. By reducing the number of shuffle files, we have somewhat reduced random file IO. In addition, Flink’s batch data sort-shuffle implements more IO optimizations to maximize sequential reads and writes of files. In the data write phase, better sequential writes are achieved by aggregating the data buffers to be written into larger batches and written out via the WTITEV system call. In the data reading stage, by introducing read IO scheduling, data reading requests are always served according to the offset order of files so as to maximize the sequential read of files. Experiments show that these optimizations greatly improve the performance of batch data shuffle.
1.4 Reducing read/write I/O amplification
The traditional sort-spill-merge approach increases the size of the read data block by merging multiple generated files into a larger file. Although this implementation plan brought benefits, but also has some shortcomings, will ultimately is to read and write IO amplification, for data shuffle between computing nodes, in the case of no error occurs, itself only need to write and read data, but data merger has the same data is read and write many times, leading to more total IO, And storage space consumption will also increase.
The implementation of Flink avoids the process of file and union by continuously adding data to the same file and unique storage structure. Although the size of a single data block is smaller than the size after the sum and union, due to avoiding the overhead of file merging combined with Flink’s unique IO scheduling, The result is higher performance than the sort-spill-merge scheme.
1.5 Reducing memory buffer consumption
Similar to the implementation of sort-shuffle in other distributed computing systems, Flink uses a fixed-size memory buffer to cache and sort data. The size of this buffer is independent of concurrency, which decouples the size of the buffer required for upstream shuffle data writes. Combined with another memory management optimization, Flink-16428 can simultaneously achieve concurrent independent memory buffer consumption for downstream shuffle data reads, thus reducing memory buffer consumption for large batch jobs. (Note: Flink-16428 is suitable for both batch and stream operations)
2. Implementation details
2.1 Sorting memory data
In the sort-spill phase of shuffle data, each piece of data is first serialized and written into the sort buffer. After the buffer is filled up, all binary data in the buffer is sorted according to the sequence of data partitions. After that, the sorted data is written to the file in the order of the data partition. Although the data itself is not currently sorted, the sort buffer interface is sufficiently generalized to enable potentially more complex sorting requirements later on. The sort buffer interface is defined as follows:
public interface SortBuffer {
*/** Appends data of the specified channel to this SortBuffer. \*/*
boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;
*/** Copies data in this SortBuffer to the target MemorySegment. \*/*
BufferWithChannel copyIntoSegment(MemorySegment target);
long numRecords();
long numBytes();
boolean hasRemaining();
void finish();
boolean isFinished();
void release();
boolean isReleased();
}
Copy the code
For sorting algorithm, bucket-sort with low complexity is selected. Specifically, each piece of serialized data is preceded by a 16-byte metadata insert. Contains 4 bytes of length, 4 bytes of data type, and 8 bytes of pointer to the next data in the same data partition. The structure is shown in the figure below:
When data is read from the buffer, all data belonging to the data partition can be read by simply following the chain index structure of each data partition, and the data remains in the order in which the data was written. In this way, all data can be read in the order of the data partition to achieve the goal of sorting by data partition.
2.2 File Storage Structure
As mentioned earlier, shuffle data generated by each parallel task is written to a physical file. Each physical file contains multiple data regions, and each data region is generated by a single sort-spill of the data buffer. In each data block, all data belonging to different data partitions (consumed by different parallel tasks of downstream compute nodes) are sorted and aggregated according to the ordinal order of the data partitions. The following figure shows the detailed structure of the Shuffle data file. R1, R2, and R3 are three different data blocks that correspond to three sort-spill data writes. There are three different data partitions in each data block, which will be read by three different parallel consumption tasks (C1, C2, C3) respectively. That is, data B1.1, B2.1 and B3.1 will be processed by C1, data B1.2, B2.2 and B3.2 will be processed by C2, and data B1.3, B2.3 and B3.3 will be processed by C3.
Similar to other distributed processing system implementations, there is also an index file for each data file in Flink. The index file is used to index the data partition belonging to each consumer at read time. An index file contains the same data region as a data file. Each data region has the same number of index entries as a data partition. Each index entry contains two parts, which correspond to the offset and length of the data file respectively. As an optimization. Flink caches up to 4M index data per index file. The mapping between data files and index files is as follows:
2.3 Reading I/O Scheduling Information
In order to further improve file I/O performance, Flink further introduces an I/O scheduling mechanism based on the above storage structure, which is similar to the elevator algorithm of disk scheduling. Flink’s I/O scheduling is always scheduled according to the file offset sequence of I/O requests. More specifically, if the data file has N data regions, each data region has M data partitions, and m downstream computing tasks read the data file, The following pseudocode shows how Flink’s IO scheduling algorithm works:
*// let data_regions as the data region list indexed from 0 to n - 1*
*// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1*
for (data_region in data_regions) {
data_reader = poll_reader_of_the_smallest_file_offset(data_readers);
if (data_reader == null)
break;
reading_buffers = request_reading_buffers();
if (reading_buffers.isEmpty())
break;
read_data(data_region, data_reader, reading_buffers);
}
Copy the code
2.4 Data broadcast optimization
Data broadcast refers to sending the same data to all parallel tasks of downstream compute nodes. A common application scenario is broadcast-Join. Flink’s sort-shuffle implementation optimizes this process so that only one copy of the broadcast data is kept in the memory sort buffer and shuffle file, which can greatly improve the performance of data broadcasting. More specifically, when a broadcast is written to the sort buffer, the data is serialized and copied only once, and when the data is written to a shuffle file, only one copy of the data is written. In an index file, data index entries of different data partitions all point to the same data block in the data file. The following figure shows all the details of data broadcast optimization:
2.5 Data Compression
Data compression is a simple and effective optimization tool, and test results show that data compression can improve overall TPC-DS performance by more than 30%. Similar to Flink’s hash based batch shuffle implementation, data compression is performed in the unit of network buffer. Data compression does not cross data partitions, that is, data sent to different downstream parallel tasks are compressed separately. Compression occurs before the data is written after sorting, and downstream consuming tasks decompress the data after receiving it. The following figure shows the entire process of data compression:
4. Test results
Stability of 1.
The new sort-Shuffle implementation greatly improves the stability of Flink running batch jobs. In addition to addressing potentially unstable file handles and inode exhaustion issues, it also addressed some known problems with Flink’s original hash shuffle, such as Flink-21201 (creating too many files causing the main thread to block). Flink-19925 (IO operation in network Netty thread affects network stability), etc.
2. The performance
We run a test on tPC-DS 10T data scale with a concurrent scale of 1000, and the results show that compared with Flink’s original batch data shuffle implementation, the new data shuffle implementation can achieve a performance improvement of 2-6 times. If the calculation time is not included, Only statistics shuffle time can be up to 10 times the performance improvement first. The following table shows the details of the performance improvement:
Jobs | Time Used for Sort-Shuffle (s) | Time Used for Hash-Shuffle (s) | Speed up Factor |
---|---|---|---|
q4.sql | 986 | 5371 | 5.45 |
q11.sql | 348 | 798 | 2.29 |
q14b.sql | 883 | 2129 | 2.51 |
q17.sql | 269 | 781 | 2.90 |
q23a.sql | 418 | 1199 | 2.87 |
q23b.sql | 376 | 843 | 2.24 |
q25.sql | 413 | 873 | 2.11 |
q29.sql | 354 | 1038 | 2.93 |
q31.sql | 223 | 498 | 2.23 |
q50.sql | 215 | 550 | 2.56 |
q64.sql | 217 | 442 | 2.04 |
q74.sql | 270 | 962 | 3.56 |
q75.sql | 166 | 713 | 4.30 |
q93.sql | 204 | 540 | 2.65 |
On our test cluster, the data read and write bandwidth of each mechanical disk can reach 160MB/s:
Disk Name | SDI | SDJ | SDK |
---|---|---|---|
Writing Speed (MB/s) | 189 | 173 | 186 |
Reading Speed (MB/s) | 112 | 154 | 158 |
Note: The configuration of our test environment is as follows. Because we have a large memory, the actual data shuffle of some tasks with a small amount of shuffle data is only read and write memory. Therefore, the table above only lists some queries with a large amount of Shuffle data and significant performance improvement:
Number of Nodes | Memory Size Per Node | Cores Per Node | Disks Per Node |
---|---|---|---|
12 | About 400G | 96 | 3 |
5. Tuning parameters
In Flink, sort-shuffle is disabled by default. To enable sort-shuffle, you need to set taskManager.net work.sort-shuffle.min-parallelism. This parameter indicates that if the number of data partitions (a computing task sends data to several downstream compute nodes concurrently) is lower than the value, the hash-shuffle function is implemented. If the number is higher than the value, the sort-shuffle function is enabled. In practical applications, you can set this parameter to 1, that is, sort-shuffle, on a mechanical hard disk.
Flink does not enable data compression by default. For batch jobs, this is recommended in most scenarios unless the data compression rate is low. The parameters of the open for taskmanager.network.blocking-shuffle.com pression. Enabled.
Shuffle data writes and reads occupy memory buffers. Taskmanager.net work.sort-shuffle.min-buffers Read data buffer by the taskmanager. Memory. Framework. Off – heap. Batch – shuffle. The size control. The data write buffer is cut out of the network memory. If you want to increase the data write buffer, you may need to increase the total size of the network memory to avoid the error of insufficient network memory. The data read buffer is sliced from the frame’s off-heap memory, and if you want to increase the data read buffer, you may also need to increase the frame’s off-heap memory to avoid the direct memory OOM error. Larger memory buffers generally provide better performance, and for large batch jobs, several hundred megabytes of data write and read buffers are sufficient.
Vi. Future prospects
There are some further optimizations, including but not limited to:
1) Network connection multiplexing, which can improve the performance and stability of network establishment. Related Jira include Flink-22643 and Flink-15455;
2) Multi-disk load balancing, which is helpful to solve the problem of uneven load. Related Jira include Flink-21790 and Flink-21789;
3) Implement remote data shuffle service, which is conducive to further improve the performance and stability of batch data shuffle;
4) Allow users to select the disk type, which can improve the ease of use. Users can choose to use HDD or SSD according to the priority of the job.
Link to original English text:
Flink.apache.org/2021/10/26/…
Flink.apache.org/2021/10/26/…
On December 4-5, Flink Forward Asia 2021 will be launched with 40+ first-tier manufacturers in multiple industries and 80+ dry goods issues, bringing exclusive technology feast for developers. flink-forward.org.cn/
Otherwise the firstFlink Forward Asia HackathonOfficially launched, 10W bonus waiting for you!www.aliyun.com/page-source…