Introduction: Alibaba master Search (Taobao Tmall search) is a very important search offline platform business, with a large amount of data, a large number of tables, the total number of source tables and hot data and other characteristics. There is no lack of performance challenge for migrating a large data volume application such as the master search with complex logic to the offline search platform. What are the optimizations for the offline search platform to achieve full high throughput and low latency? The outline of the article is as follows:
- preface
- Search the basic concepts of offline platforms
- Main search service features and performance requirements
- Description of Blink Job performance tuning
- conclusion
This article will share the optimization experience of Ali Main search in realizing full high throughput and low incremental delay, hoping to be helpful for your Flink application.
A preface.
In Ali search engineering system, we call the search engine, online grading and other millisecond response to user requests as “online” services; On the other hand, all kinds of online services such as search engines are referred to as “offline” systems. As the data provider of the search engine, the search offline platform is the only way for all businesses of the Group to access and search, and it is also a very important link in the whole search link. The quality and speed of offline output data directly affect the user experience of downstream businesses.
After years of precipitation, the offline search platform not only carries a large number of search businesses within the group, but also has a lot of external customers on the cloud. With the rich functions of the platform, Blink (Flink version of Alibaba internal version) takes the lead. At the beginning of 2019, we began to plan to migrate the main search (Taobao Tmall search) to the offline search platform.
The architecture of main search before migrating to offline search platform had many shortcomings such as aging architecture, low version of Blink, difficult operation and maintenance, disunity of computing framework and so on. With the loss of staff and the increasing difficulty of operation and maintenance, the reconstruction work was already imminent.
There is no lack of performance challenge for migrating the logic complex application of X billion data scale of master search to the search offline platform. Business characteristics and performance requirements determine that every step in the process of master search on the platform will be very difficult. In order to meet the performance requirements, we have tuned almost every Blink Job individually. The initial ideal and final outcome are both good, but the process is extremely tortuous. This paper will mainly introduce the specific attempts of main search in the process of migrating and searching offline platforms in terms of performance tuning.
The completion of the main search migration search offline platform has a milestone significance for the platform, which represents that the search offline platform has the ability to undertake super-large business.
2. Search the basic concept of offline platform
Offline search platform processing the total increment of a main search is mainly composed of synchronization layer and data processing layer, which respectively include the full and incremental processes. In order to better understand the following, I will briefly introduce some basic concepts about searching offline platforms.
1. Support business within the group
The search offline platform currently supports hundreds of businesses within the group, including Main Search and AE. Among them, taobao Tmall evaluation service has the largest data volume, with the data volume reaching X billion pieces, and each piece of data has nearly X fields.
2. The scene
Process the user’s data source (MySQL or ODPS) table, and import the data into HA3 online search engine or ES after a series of offline processing processes.
3. Platform-related technology stack
As shown in the following figure, search for offline platform Currently data storage is based on HDFS/ Pangu, resource scheduling depends on YARN or Hippo, and computing framework is implemented using Flink/Blink.
4. The whole quantity
Full refers to the search business data is all processed and generated, and transmitted to the online engine, generally once a day.
There are two reasons for doing this: some business data is Daily updated; The engine needs full data to efficiently index and preprocess and improve online service efficiency. The total volume is mainly divided into synchronization layer and data processing layer.
5. The incremental
Delta is the updating of real-time data changes from upstream data sources to the online engine.
This means that in our scenario we don’t need to guarantee Exactly Once semantics for incremental data, just At Least Once semantics. Based on this background, we can use full link asynchronous thinking to solve one-to-many problems (more on this later).
Like full, deltas have a synchronization layer and a data processing layer.
6. A couple more
In the search field, some business data needs to be described in one-to-many form, such as the relationship between goods and skUs is a typical example of one-to-many data. In the architecture of offline search based on Hologres (distributed database developed by Alibaba itself) storage, one-to-many data is stored in a single double PK HoloTable, and the baby ID and SKU_ID of the first and second primary keys are respectively.
With these concepts in mind, we will see the performance tuning of offline search platform for each Blink Job of main search in the following paragraphs. First, we will briefly summarize the business characteristics and performance requirements of main search.
7. Data storage mode
When HBase is used as a mirror table to search for offline platforms, a large-width multi-column family table is used to store all service data in a single dimension. After detailed investigation, we decided to replace HBase with Hologres, which required a complete reconfiguration of the storage architecture. Multiple tables are used to simulate the multi-column family in HBase. A single HoloTable contains data from many service data source tables. The data storage mode after reconstruction is as follows:
8. Synchronous layer
The so-called synchronization layer generally synchronizes data from upstream data sources to mirror tables for efficient processing by the data processing layer. As the single-dimension data of the business side consists of many MySQL tables or ODPS tables, there are at least X tables, and at most X tables like the main search. Therefore, when data of the same latitude is aggregated into a Holo table, a lot of shuffle will be generated if multiple tables join in pairs. Therefore, asynchronous UPSERt is adopted to write data of different data source tables into different columns of Holo table to solve the problem of massive data import.
9. Data processing layer
The data processing layer computs the data of all mirror tables (HBase/Holo) obtained at the synchronization layer, including multi-table Join and UDTF, to facilitate the development and access of search services.
3. Main search service characteristics and performance requirements
The following will first introduce the main search service characteristics and performance requirements, and then introduce in detail what kind of tuning we did to meet the performance requirements.
1. Main search service characteristics
- Large amount of data
The main search has X billion (X billion valid) items, i.e., X billion pieces of data in the main dimension, which is X orders of magnitude more than the other services on the platform (except taobao evaluation services). Can we get all this data done in X more hours? How to achieve high throughput? The challenges are huge.
- There are many one-to-many tables
In the main search service, many one-to-many tables need to be joined. For example, one commodity corresponds to multiple SKUs, and some commodities correspond to nearly X SKUs. How can this information be transformed into a high performance commodity dimension and associated with commodity information?
- Total number of source tables is large
The main search has more than X tables (including one-to-many tables), and the number of source tables of other services on the platform is generally in single digits. The large number of source tables leads to a number of problems, such as how to avoid triggering ODPS limits when reading ODPS data? How to achieve high throughput when pulling large table data? These problems need to be solved one by one.
- Hot data
There are some big sellers (Ele. me, Hema, etc.) corresponding to many commodities in the main search, resulting in serious data skew and other problems in the data processing layer. How to solve the SKEW problem frequently occurred in the direction of big data processing?
2. Primary search performance requirements
- Full (synchronization layer + data processing layer) high throughput!
The full volume is required once a day, and X billion goods are processed each time with limited resources. With such a large amount of data, how to achieve high throughput is a huge challenge!
- Incremental (synchronization layer + data processing layer) low latency!
Deltas are required to achieve low latency in seconds when TPS is X W, and some tables (for example, TABLE XX) can achieve TPS of X W during Double 11. How can deltas ensure stable low latency? Worth thinking about!
Below is a step-by-step description of how we solved these problems to achieve performance.
Detailed description of Blink Job performance tuning
According to the above main search service characteristics and performance requirements, the following figure is listed. The left and middle columns indicate which features of the main search lead to poor task performance at a certain stage. Therefore, we need to tune the Blink Job at the corresponding stage. The completion of the tuning means that the platform can meet the performance requirements of full high throughput and low incremental delay required by the main search in the rightmost column in the figure.
The following is a full, incremental, one-to-many problem context to introduce how we solve the above five problems to achieve full high throughput and incremental low latency performance requirements.
1. Full high throughput performance tuning
The full volume mainly includes the synchronization layer and the data processing layer, and must achieve high throughput in order to complete the full volume in X hours. It is a huge challenge for the synchronization layer to synchronize the full amount of data in about X billion tables in a short period of time without affecting the incremental timeliness of simultaneous operation. The data processing layer has to process more than X billion pieces of data in a short time, Join many mirror tables, UDTF processing, MultiGet and so on, and finally produce a full HDFS file. The optimization process once made people nearly give up. This section focuses on the performance tuning process of the data processing layer.
The Job tuning takes a long time and there are many attempted schemes. The following are explained in chronological order.
- The initial configuration
First mention of IC dimension for the goods dimension, UIC dimensions for sellers dimension, and started our solution is not FullDynamicNestedAggregation and IncDynamicNestedAggregation (will be referred to the two Job in more detail later). Perform a series of DImJoin, UDTF, and MultiJoin operations after scanning the IC dimension single Pk table. When the data of DimJoin multi-PK table (one-to-many table) is found in the test process, the performance is very low, and the process of full-link Async is degraded to Sync, because our one-to-many data exists in a separate SaroTable (logical abstraction of multiple Holotables). Partial Scan was used to fetch all data for the specified first PK, which was fully synced, creating a Scanner for every Get, although we cached DimJoin, In addition, the precise Cache for SubKey is added to the MultiGet unique to the main search. However, after testing, the performance was not satisfied at all, so I tried to continue optimization.
- Introduce LocalJoin and SortMergeJoin
Since the performance bottleneck is in the multi-PK SaroTable of DimJoin, we tried to remove this part. Since the one-to-many SaroTable has only two dimensions, we tried to carry out LocalJoin for all tables (including single PK and multi-PK) of IC dimension and UIC dimension respectively first, then SortMergeJoin, and then continue with other processes.
Let’s start with Local Join. HoloStore ensures that all tables in the same DB follow the same Partition policy and are sorted according to the primary key lexicographical order. Therefore, data of the same latitude and Partition can be pulled into a process for Join, avoiding Shuffle. As shown in the figure below.
So the topology becomes roughly:
After the test, due to the large seller (one seller has many goods) on the business, SortMergeJoin will have a very serious long tail. As shown in the figure below, the data with Uid 101 and 103 all fall into the same concurrency. I once tried to add another layer of PartitionBy NID on this basis, but it didn’t help. Because the SortMergeJoin Sort phase and External Shuflle Merge Disk File multiple times for tasks with large amounts of data, the long tail Task still takes a long time to Finish.
- Add salt to break up the big sellers
So we need to keep tuning. After discussion within the group, we decided to add salt to break up the big sellers, find the IDS of Top X big sellers from the ODPS source table, and add UDF and UDTF to the primary and secondary dimensions respectively after Scan + Local Join. The specific flow chart and principle examples are shown in the following two pictures:
As shown in the figure above, the data with Uid 101 and 103 were split into multiple concurrent data, and since we added UDTF after SortMergeJoin to remove the added Salt, the final data will not have any impact.
- Final shape
Therefore, we began to adjust the incremental process (IncJoin). At that time, we found that IncJoin had the same problem with the initial form of FullJoin. It was very slow to catch up with the increment and could never catch up. So after group discussion decision synchronization for entire quantity of new FullDynamicNestedAggregation Job (will be mentioned in more detail below), it is a Blink Batch Job it will be all dimensions one-to-many SaroTable data is written to the corresponding dimension of the main table, Then, the FullJoin Scan is performed at the beginning of the Scan, which avoids the multiple PK SaroTable of DimJoin. The final form of FullJoin is as follows:
2. Incremental low latency performance tuning
Incremental performance is mainly confined to IncJoin at the data processing layer. This Job starts as a Blink Stream Job, which reads incremental messages from SwiftQueue and associates data in each mirror table to complete fields, and performs UDTF processing on data. Finally, the incremental message is sent to the online engine SwiftQueue.
Based on the idea of “stream batch integration”, after a series of attempts, the final shape of our incremental data processing layer Job is as follows.
Unlike full, since deltas are updated in real time, updates are written not only to the Swift Queue but also to the SaroTable. In addition, we added Windows to each Job according to service characteristics.
3. Solve the one-to-many problem
There are many one-to-many tables in the main search. How to complete the fields after the data processing layer efficiently gets the data out and converts it into the main dimension has puzzled us for a long time.
In order to improve efficiency, we have to find ways to improve Cpu utilization. Therefore, Get records are changed to full-link asynchronous. Since one-to-many data exists in the HoloTable with multiple pk, the first PK is designated to obtain relevant data in the Holo server through Scan. In this way, due to the infectivity of asynchronous programming, full-link asynchronous will degenerate into synchronization, and the performance is not up to standard at all.
- The solution
In order to transform “pseudo-asynchronous” into true full-link asynchronous, after many discussions and practices, we decided to Scan multiple data of the same first PK in one-to-many tables into GroupBy data, convert each field into Json and then Put it into the main table. The main steps are shown in the figure below.
We for the whole amount and increment in synchronous layer add Job to solve, respectively FullDynamicNestedAggregation (Blink Batch Job) and IncDynamicNestedAggregation (Blink Stream Job), The following figure shows the general process of the two jobs.
It is worth noting that, as mentioned earlier in the context of deltas, our scenario does not require Exactly Once semantics for incremental data, only At Least Once semantics. Therefore, based on this background, we can split the incremental jobs of the data processing layer into two jobs to execute, and the one-to-many problem can be solved.
In this way, we do not need to Scan HoloTable in the data processing layer, so that we can use the full link asynchronous way to improve the overall performance of incremental.
- Truncation optimization
In order to avoid the “big row” problem of FullGC caused by the large amount of data after converting multiple pieces of data into one piece of data. Based on service features, we support truncation for each one-to-many table during Scan. For the same first PK record, only a certain number of records are scanned and assembled into Json. In addition, whitelist configuration can be implemented for different tables.
- Add filter Window optimization
According to the characteristics of the business, many tables of one-to-many although can accept a certain time delay, but in order to avoid the offline system and online BuildService caused too big impact, so the update can not be too much, so we add 30min to the window, this window is very large, the average rate of weight as high as X%.
Five epilogue.
After a series of optimizations, the main search not only saves a lot of resources compared with the old architecture, but also achieves full throughput and incremental low delay, and can cope with the sudden increase of traffic at zero point of double 11 in 2019 with ease.
Tuning system performance is an extremely complex and delicate task, which is technically challenging. Not only is it necessary to be familiar with the technical tools (Flink/Blink) chosen, but it is also necessary to understand the business. Adding Windows, truncating optimization, adding salt to break up big sellers, etc., is done precisely because the business scenario tolerates the corresponding disadvantages of these methods.
In addition to the tuning experience mentioned in this article, we have also done a lot of tuning for the synchronization layer full incremental Job and MultiGet. The reasons for the length and the 80/20 principle are not detailed here.
The successful migration of the main search also makes the search offline platform to complete the last piece of the puzzle, and become the basic module of alibaba Group search center and core link.
About the author:
Wang Weijun, alias Hongli, Senior development engineer of Alibaba Search recommendation Division. I graduated from Nanjing University of Posts and Telecommunications with a master’s degree in 2016. Apache Hadoop & Flink & Eagle Contributor. Currently, I am responsible for the work related to the Runtime of Alibaba search offline platform.
In addition, Chen Huaxi (Kunlun) gave many suggestions for this paper, and some of the illustrations were contributed by Li Guodin (Shi Ji).