Author introduction: Huang Jianbo, technical development engineer of cloud computing; Jin Ling, software development engineer at Shopee.

Their team Huang – B won the first prize in the performance competition. This paper will introduce the design and practice process of Shuffle optimized TiDB operator project.

In our usual impression, part and union are a pair of contradictory concepts, but this competition left a very deep impression on our team is that part and union are a pair of mutually promoting contradictions, only clean decomposition, can efficiently merge. On the one hand, this impression comes from our idea of the competition. We choose to use Shuffle operation to split the data source of the operator into multiple independent partitions, and then improve the overall throughput through parallel computing. The optimization process is to find a more appropriate decomposition method to pursue better scalability and computing performance. Related technical details are described in detail in the text. On the other hand, it comes from our competition experience.

Our team had two members and one consultant scattered in three different cities in China. From the beginning to the end of the competition, we had no chance to communicate with each other offline, but cooperated in the form of slack and documents. We joked that we were also a distributed team when doing a distributed competition. Under these constraints, two factors played a key role in effective team collaboration: one was that our consultant had cleanly split the task, applying the same idea to two different types of operators that allowed us to have two flowers, one for each; The other is TiDB’s overall design of high cohesion and low coupling. From the perspective of vertical, TiDB’s clear hierarchical design enables our optimization to focus only on the level of parser and actuator, without going deep into the TiKV storage at the bottom. From the perspective of horizontal, It is TiDB’s sufficient practice of divide-and-conquer and polymorphism that allows us to focus only on optimized operators without worrying about side effects on other operators.

Technical background

Our optimization idea is to use Shuffle operator to realize the parallelization of MergeJoin operator and StreamAggregation operator. Shuffle operator was first published in PR github.com/pingcap/tid… The Window operator. Figure 1 shows the parallelization process of Window operator.

On the left side of the figure is the serial Window operator, which usually has a Sort operator between the data source and the Window because the Window operator requires the input data to be ordered. In the right part of the figure, the corresponding Shuffle operator is shown. In order to complete parallel computation, multiple copies of the Window operator and Sort operator are copied, and each copy corresponds to a ShuffleWorker. The incoming data from the data source is split into independent data partitions by Splitter according to the hash value. It is sent to different shuffleworkers, and finally the results of each Window operator are summarized and output. The arrow in the figure is the direction of the data flow, in which the distribution of data and the summary of results are realized through the Go Channel, and other data flows are obtained by the parent node by calling the Next method of the child node. The dotted line in the figure represents the coroutines started. Each ShuffleWorker will start one coroutine to complete its own operation, while Splitter will also start one coroutine to complete data distribution.

Title link:

1. ShuffleMergeJoin:github.com/pingcap/tid…

2. ShuffleStreamAgg:github.com/pingcap/tid…

FIG. 1 Parallelization of Window operator

ShuffleMergeJoin

Extended Shuffle operator

For parallel optimization of MergeJoin, simply apply ShuffleWindow’s framework. No, the MergeJoin operator is different from the Window operator above. MergeJoin requires two data sources. Can the current Shuffle implementation make each parallel operator correspond to two Shuffleworkers, and thus to two data sources? The answer is no, because the Shuffle implementation is so over-coupled with data partitioning and computing parallelism that it cannot support two data sources. Below we make specific explanation to this problem.

Over-coupling refers to the fact that ShuffleWorker plays too many roles. It is not only a part of the data flow, but also the basic unit of computing parallelism, which brings these two problems:

1. Since ShuffleWorker is a data partition, each MergeJoin after parallelization requires two Shuffleworkers to receive data from two data sources. However, ShuffleWorker is also a basic unit for computing parallelism. Therefore, 2n coroutines will appear if there are n MergeJoin operators, and two coroutines of the same MergeJoin operator will also have data competition.

2. The control logic is complex. As a partition of data, ShuffleWorker must be the child node of Sort operator, and as the basic unit of parallel computation, it must call the Next method of Window operator in coroutine to complete the computation. Therefore, the original implementation put a pointer to the Window operator in the ShuffleWorker. Such a design, on the one hand, has the hidden danger of destroying the directed acyclic feature of the execution tree, and on the other hand, reduces the readability of the code.

Of course, we can solve the first problem by adding a Boolean variable to the ShuffleWorker: one of the two Shuffleworkers corresponding to the same MergeJoin is true and the other is false, and only the one that is true will start the coroutine. But this approach would undoubtedly complicate the complex logic mentioned above.

** Our proposed solution is to decouple data partitioning from computing in parallel. ** As shown in Figure 2: Computational parallelism is still handled by ShuffleWorker, but it is no longer a part of the data flow process. Its original position in the data flow process is replaced by ShuffleReceiver. MergeJoin is a member of ShuffleWorker, and each ShuffleWorker corresponds to a coroutine. The Next method of MergeJoin is called in the coroutine and the result is sent to the summary operator, thus the two problems mentioned above are solved.

Figure 2 Expanded Shuffle operator

Related PR: github.com/pingcap/tid…

Realization and effect

In the implementation, we consider two scenarios: one is if the data source itself is unordered, in which case the data passes through the Sort node before entering MergeJoin; The other is if the data source itself is ordered, in which case the data does not need to be sorted before entering MergeJoin.

Figure 3 ShuffleMergeJoin in the case of disordered data source

Figure 3 shows the parallelization process of MergeJoin in the case of disordered data source. In this case, the computation overhead of MergeJoin and Sort operator can be shared among multiple coroutines. The optimization effect of starting two workers is shown in Table 1. We have conducted tests on data sources of different sizes. The first two columns in the table are the number of rows of the two data sources, and the last two columns in the table are the running performance of the serial and parallel versions, in ns/op. As can be seen from the table, Shuffle can significantly accelerate MergeJoin operation, and the acceleration effect is better when the amount of data is larger (because parallelization will introduce additional costs such as pipes and coroutines, and only a large amount of data can ensure that the benefits of parallelization exceed the costs). In our several test cases, the operation time of the two workers is only 56.5% of the serial version in the best case.

Table 1 ShuffleMergeJoin optimization effect

Figure 4 shows the parallelization process of MergeJoin in the case of ordered data sources. The difference is that the data no longer goes through the Sort operator. In this case, the computation load itself is relatively light. In contrast, Splitter, which distributes data according to hash value, becomes the performance bottleneck of the system, and the performance improvement is not obvious after parallelization.

Figure 4 ShuffleMergeJoin in the case of ordered data sources

Related PR:

1. ShuffleMergeJoin: github.com/pingcap/tid…

2. Control parameters: github.com/pingcap/tid…

3. Unit testing and performance testing: github.com/pingcap/tid…

ShuffleStreamAggregation

Aggregation operations are an essential part of SQL statements. Aggregation is often used in both OLTP and OLAP scenarios.

From the perspective of system implementation, aggregation can be implemented in two ways. The first is a Hash based method, which maintains the value of each element to be aggregated and calculates the final value by constructing a Hash table. The other is the method based on ordered data flow, which requires that the input data source must be ordered, and then the final calculation result can be obtained by traversing the ordered data flow and maintaining the corresponding aggregate value at the same time.

Generally speaking, Hash based methods have higher computation speed, but they require the maintenance of a Hash table, and the cost of memory space is high. When the number of possible values of the aggregated key is very large, the number of elements in the Hash table will also be very large, which is a big test for memory. There is a risk of memory bursting, which in turn leads to calculations not being completed properly. However, the aggregation operation based on the ordered data flow method does not need to maintain all the aggregated key values in memory at any time, so the memory consumption is relatively small, but its running speed is relatively slower, and the input data must be ordered more strictly. If the speed of the aggregation operator based on the ordered data flow method can be improved, it will be more suitable for the case of large data volume. Therefore, we choose to accelerate the Stream Aggregation algorithm based on the ordered data Stream method in parallel to improve the overall running speed of the operator.

Realization and effect

In the specific implementation process, we used the Shuffle operator provided by other community contributors to divide the input data into multiple ordered input data streams around the StreamAggregation operator and input them into multiple StreamAggregation operators respectively. Then through simple integration, the final calculation results are obtained. To put it simply, the initial input DataSource is divided into multiple partitions through Shuffle operator, and each Partition is in its internal order. Then, each Partition is used as the input of a StreamAggregation operator to generate partial results. Finally, by integrating elements with the same key, the final overall calculation result can be obtained.

We need to consider whether the DataSource is in order. If the aggregate key is unordered, for example, the PhysicalTableReader operator or the output of other operators, we need to make the DataSource in order before partitioning. So you need to add a Sort operator on top of it (as shown in Figure 5).

Figure 5 ShuffleStreamAggregation in the case of disordered data source

For this scenario, our approach ended up achieving a significant performance improvement (see Table 2). In the non-parallel case, Sort is applied to the whole DataSource, while the parallelized version is applied to each different Partition, with relatively small input and parallel execution. Therefore, the performance is greatly improved.

Table 2 Optimization effect of ShuffleStreamAggregation

For example, in the following SQL statement, the aggregate key of the DataSource is b, and the input DataSource t has an index created by b. The DataSource is a PhysicalIndexTableReader based on b, so we introduce Sort operator in order, directly divide the input into multiple partitions, and then obtain the result through the calculation process shown in FIG. 6.

create table t(a int, b int, key b(b)); 
select /*+ stream_agg() */ count(a) from t group by b;
Copy the code

Figure 6 ShuffleStreamAggregation in the case of ordered data sources

Benchmark results show that in this case, the running speed of the current Shuffle based implementation is not improved, but decreased. We think that the current Shuffle implementation is the bottleneck and the focus to be solved in the future.

Related PR:

  1. Github.com/pingcap/tid…

  2. Github.com/pingcap/tid…

RangeSplitter

As mentioned above, Shuffle operator will divide data input into multiple partitions. At the beginning, only Splitter based on Hash method is implemented, which does not require whether the input data is in order. In the case of data sources in order, although this method is still applicable, but using the method based on the Range to segmentation of data source, is a more natural way, because the same multiple rows of data aggregation key, must be close to in the past, if can directly find the starting point and the end of a block of data points, the overall disposable segmentation, Then there is no need to build HashTable and no need to call HashFunction, which is more expensive, making the Partiton process less expensive. Based on this idea, we realized the PartitionRangeSplitter. The calculation principle of this method is to batch distribute multiple rows of data with the same aggregation key close to each other to a worker. Compared to the Partitioner based on the Hash method, the implementation based on the Range method is less expensive. While simultaneously processing ordered input data sources, Using the RangePartitioner can be twice as fast as using the HashPartitioner (see Table 3), proving that the operator is more suitable for ordered data sources.

Table 3 Performance comparison between RangeSplitter and HashSplitter

Related PR:

1. RangeSplitter implementation: github.com/pingcap/tid…

2. Related performance tests: github.com/pingcap/tid…

conclusion

In this performance challenge competition, Shuffle operator is used to accelerate MergeJoin operator and Stream Aggregation operator, and significant performance improvement is achieved in the case of disordered data source. In the process of optimizing MergeJoin, we extended the existing Shuffle implementation to accommodate operators of multiple data sources, improving readability and extensibility. In the process of optimizing StreamAggregation, considering the ordered data sources, a simple Splitter implementation based on Range method is proposed, which also proves its effectiveness. In the future, we will consider how to transform the existing Shuffle operator to eliminate the existing performance bottleneck, so as to further improve the performance of a series of parallel operators based on Shuffle.