Flink Forward Asia 2021 by BIGO Staff Engineer Zou Yunhe. The main contents include:
- Business background
- Landing practice & Characteristic improvement
- Application scenarios
- The future planning
FFA 2021 Live Playback & Presentation PDF download
I. Business background
BIGO is an overseas short video live streaming company. Currently, the company’s main businesses include BigoLive (global live streaming service), Likee (short video creation and sharing platform), and IMO (free communication tool), with 400 million users worldwide. With the development of business, the requirements on the processing capacity of data platforms are becoming higher and higher, and the problems facing the platforms are becoming increasingly prominent. Next, BIGO big data platform and the problems it faces will be introduced. The data flow diagram of BIGO big data platform is as follows:
Users’ behavior log data on APP and Web pages, as well as Binlog data of relational databases, will be synchronized to BIGO big data platform message queue and offline storage system, and then calculated by real-time and offline data analysis means, so as to be applied to real-time recommendation, monitoring, impromptu query and other scenarios. However, there are several problems:
-
Different OLAP analysis platform portals: Presto and Spark analysis task portals coexist. Users do not know which engine is suitable for executing THEIR SQL queries, which leads to poor experience. In addition, users will submit the same query in two entrances at the same time to obtain the query results faster, resulting in resource waste.
-
The calculation of offline tasks is delayed and the output is too slow: typical operations such as ABTest often calculate the results in the afternoon;
-
Each business side develops applications independently based on their own business scenarios, and real-time task smokestack development lacks data stratification and data blood relationship.
BIGO has built the OneSQL OLAP analysis platform and real-time data warehouse to address these issues.
-
The OneSQL OLAP analysis platform provides unified ACCESS to OLAP queries, reducing blind selection and improving resource utilization.
-
The real-time data warehouse task was constructed by Flink, and the data was stratified by Kafka/Pulsar.
-
Some tasks that are slow in off-line computing are transferred to Flink streaming computing tasks to accelerate the output of calculation results.
In addition, the construction of real-time computing platform Bigoflow to manage these real-time computing tasks, the construction of real-time task blood relationship.
Second, landing practice & characteristic improvement
2.1 Practice and optimization of OneSQL OLAP analysis platform
The OneSQL OLAP analysis platform is an OLAP query analysis engine that integrates Flink, Spark, and Presto. OLAP query requests submitted by users are forwarded through the OneSQL backend to clients with different execution engines, and the corresponding query requests are then submitted for execution on different clusters. Its overall architecture is shown as follows:
The overall structure of the analysis platform is divided into entry layer, forwarding layer, execution layer and resource management layer from top to bottom. To optimize user experience, reduce execution failures, and improve resource utilization for each cluster, the OneSQL OLAP analysis platform provides the following functions:
- ** Unified query portal: At the ** portal layer, users use the unified Hue query portal to submit queries based on Hive SQL syntax.
- ** Unified query syntax: ** Integrates multiple query engines, such as Flink, Spark, and Presto, to execute user SQL query tasks by adapting Hive SQL syntax.
- ** Intelligent routing: ** In the process of selecting the execution engine, it will select the appropriate engine to submit the query according to the historical SQL query execution (whether the execution is successful on each engine, and the execution time), the busy situation of each cluster, and whether the engine is compatible with the SQL syntax;
- ** Failed retry: ** the OneSQL background monitors the execution of SQL tasks. If the SQL task fails during execution, another engine is selected to retry the task.
In this way, the OneSQL OLAP analysis platform and BIGO big Data platform unify the OLAP analysis portal to reduce blind selection and make full use of the resources of each cluster to reduce idle resources.
2.1.1 Flink OLAP analysis system construction
Flink is also part of the OLAP analysis engine on the OneSQL analysis platform. Flink OLAP system is divided into two parts: Flink SQL Gateway and Flink Session cluster; The SQL Gateway serves as the entrance for SQL submission. The query SQL is submitted to the Flink Session cluster through the Gateway for execution, and the progress of SQL query execution is obtained and the query result is returned to the client. The SQL query process is as follows:
First, the SQL submitted by the user is checked in the SQL Gateway to determine whether the result needs to be persistently written to the Hive table. If so, a Hive table is created using the HiveCatalog interface to persist the calculation results of the query task. After that, the task performs SQL parsing on the SQL Gateway, sets the parallelism of the job running, generates pipelines and submits them to the Session cluster for execution.
In order to ensure the stability of the Flink OLAP system and the efficient execution of SQL queries, the following enhancements have been made in this system:
-
Stability:
- Based on ZooKeeper HA to ensure the reliability of Flink Session cluster, SQL Gateway monitors The ZooKeeper node and senses the Session cluster.
- Controls the amount of data scanned by Hive tables, the number of partitions, and the amount of returned data, preventing the Session cluster JobManager and TaskManager from OOM.
-
Performance:
- Flink Session The cluster preallocates resources to reduce the time required to apply for resources after job submission.
- Flink JobManager asynchronously parses Split, which parses tasks while executing them, reducing the execution time of blocked tasks due to Split parsing;
- Control the scanning partition and the maximum number of Split in the job submission process to reduce the time required to set task parallelism;
-
Hive SQL compatibility:
Improve Flink’s compatibility with Hive SQL syntax, and the current compatibility with Hive SQL is roughly 80%.
-
Monitoring alarms:
Monitor the memory, CPU usage and task submission of JobManager, TaskManager and SQL Gateway of Flink Session cluster, and timely alarm and handle problems once they occur;
2.1.2 Results of OneSQL OLAP analysis platform
The OneSQL OLAP analysis platform based on the above implementation has achieved the following benefits:
- Unified query entry, reduce users’ blind selection, user execution error rate decreased by 85.7%, SQL execution success rate increased by 3%;
- The SQL execution time is shortened by 10%, which makes full use of the resources of each cluster and reduces the waiting time of tasks.
- Flink, part of the OLAP analysis engine, improved resource utilization in real-time computing clusters by 15%.
2.2 Real-time data warehouse construction and optimization
In order to improve the output efficiency of some business indicators on BIGO big data platform and better manage Flink real-time tasks, BIGO big Data platform built a real-time computing platform, Bigoflow, and migrated some slow computing tasks to the real-time computing platform and implemented them through Flink streaming computing. The message queue Kafka/Pulsar is used for data stratification to construct real-time data storehouse. On Bigoflow, the tasks of real-time data warehouse are platformized, a unified real-time task access portal is established, real-time task metadata is managed based on the platform, and the blood relationship of real-time task is constructed.
2.2.1 Construction plan
BIGO big data platform is mainly based on Flink + ClickHouse to build real-time data warehouse, the general scheme is as follows:
According to the traditional data warehouse data layering method, data is divided into four layers of data, such as ODS, DWD, DWS and ADS:
- **ODS layer: ** User-based behavior logs, business logs, etc. as raw data, stored in Kafka/Pulsar message queue;
- **DWD layer: ** This part of data is aggregated according to the UserId of the user through Flink task to form the detailed behavior data of different users, which is saved in Kafka/Pulsar;
- **DWS layer: ** The Kafka stream table is joined with the Hive/MySQL dimension table, and the multi-dimensional detail data generated after the JOIN is output to the ClickHouse table.
- **ADS layer: ** Multi-dimensional detailed ClickHouse data is aggregated in different dimensions and then applied to different businesses.
In the process of building real-time data warehouse according to the above scheme, some problems are encountered:
- After the offline task is converted into a real-time calculation task, the calculation logic is complicated (multi-stream JOIN, deduplication), which leads to too large job status, OOM (memory overflow) exception or too large operation operator back pressure.
- During the Join process of the dimension table, the detailed flow table joins with the large dimension table, and the data of the dimension table is too much. OOM after loading the dimension table into the memory, the job fails to run.
- Flink writes the detailed multidimensional data generated by the stream dimension table Join to ClickHouse, which cannot guarantee Exactly-once. Once Failover occurs, the data will be repeatedly written.
2.2.2 Problem Solving & Optimization
Optimize job execution logic to reduce state
The logic of offline computing tasks is complex, involving joins and deduplication of multiple Hive tables. The general logic is as follows:
When an offline job is converted to a Flink streaming task, the scenario of joining multiple Hive tables offline is transformed into a scenario of joining multiple Kafka topics. As the Kafka topic of Join has a large flow and a long Join window time (the longest window is 1 day), when the job runs for a period of time, a large number of states will be accumulated on the Join operator (the state will approach 1T after an hour). Facing such a large state, Flink operations use Rocksdb State Backend to store State data. However, Rocksdb memory usage is too high, leading to YARN kill, or too many states are stored in Rocksdb State. Reduced throughput resulted in severe back pressure.
To solve this problem, we conduct Unoin all processing for these multiple topics according to the same Schema and get a large data flow. Then, in this large data flow, we can judge according to the event_id of different event flows to know which Topic of event flow this data comes from. Then the aggregation calculation is carried out to obtain the calculation index on the corresponding event flow.
In this way, JOIN can be replaced by UNION ALL to avoid the influence of large State caused by JOIN calculation.
In addition, there are many count distinct calculations in the calculation task, similar to the following:
select
count(distinct if(events['a'] = 1, postid, null))
as cnt1,
count(distinct if(events['b'] = 1, postid, null))
as cnt2
……
count(distinct if(events['x'] = 1, postid, null))
As cntx
From table_a
Group by uid
Copy the code
These count DISTINCT calculations are in the same group BY and are de-recalculated based on the same POSTId, so these DISTINCT States can share a set of keys for de-recalculation. A MapState can be used to store the count distinct states as follows:
These count DISTINCT functions have the same key, so they can share key values in MapState to optimize storage space. The Value of Mapstate is an array of bytes, each Byte has 8 bits, and each bit is 0 or 1. The NTH bit corresponds to n count distinct function values on this key: 1 indicates that the count disitnct function needs to be counted on the corresponding key. 0 indicates that the count disitnct function does not need to be counted. When the aggregation result is calculated, the NTH digit of all the keys is added together, which is the NTH count distinct value. In this way, the storage space of the status is further saved.
Through the above optimization, the offline task of ABTest was successfully migrated to the Flink streaming computing task, and the state of the job was controlled within 100GB, so that the job could run normally.
Flow dimension table JOIN optimization
In the process of generating a multidimensional detail wide table, a flow dimension table JOIN is required. The function of Flink JOIN Hive dimension table is used: Hive dimension table data is loaded into the memory data structure of the HashMap of a task. Data in the flow table is then joined with data in the HashMap based on Join keys. However, with hundreds of millions of rows of Hive large dimension tables, the amount of data loaded into the memory is too large, which can easily cause OOM (memory overflow). To solve the above problems, we Hash the Hive macrodimension table according to the Join Key, as shown in the following figure:
In this way, the Hive large-dimension table data is calculated by the Hash function and distributed to the HashMap of different parallel sub-tasks of Flink jobs. Each HashMap stores only part of the large-dimension table data. As long as the parallelism of the job is large enough, the large-dimension table data can be divided into enough copies. Fragment preservation; For some dimension tables that are too large, Rocksdb Map State can also be used to save the shard data.
When data in a Kafka flow table is sent to different subtasks for Join, the same Join Key is used to calculate the data according to the same Hash function. In this way, the data is allocated to the corresponding subtasks for Join and the Join result is output.
After the above optimization, some Hive large dimension table tasks are successfully joined to perform Join calculation of flow dimension table. The maximum size of the dimension table exceeds 1 billion rows.
ClickHouse Sink’s exact-once semantic support
The community’s ClickHouse does not support transactions, so there is no way to guarantee the exact-once semantics of the data as it sinks to ClickHouse. During this process, data is repeatedly written to the ClickHouse as soon as a job Failover occurs.
To address this problem, BIGO ClickHouse implements a two-phase commit transaction mechanism. When writing data to the ClickHouse, you can set the mode of writing to temporary to indicate that the data being written is temporary. When the data execution is complete, an Insert ID is returned, and the Commit operation is performed based on that Insert ID, so the temporary data becomes official.
A ClickHouse Connector is implemented based on BIGO ClickHouse’s two-phase commit transaction mechanism and combined with Flink’s checkpoint mechanism. Ensure that ClickHouse Sink writes Exactly Once semantics as follows:
- Under normal write conditions, the Connector randomly selects a shard from the ClickHouse, writes a single copy or a double copy of the ClickHouse, and records the insert ID after the write. Multiple insert ids are generated between checkpoint operations. When checkpoint operations are complete, these INSERT ids are batch committed to convert temporary data into official data. That is, data is written between two checkpoint points.
- If a Failover occurs, Flink recovers from the last checkpoint after the Failover is restarted. The Operator State in ClickHouse Sink may contain Insert ids that did not commit last time. Retry the commit for those Insert ids. For data that has been written to the ClickHouse, but the INSERT ID is not recorded in the Opeator State, it is temporary and will not be queried in the ClickHouse after some time. This will be cleaned up by ClickHouse’s expired cleanup mechanism, ensuring that data is not duplicated after the state is rolled back to the last checkpoint.
Through the above mechanism, data is successfully written from Kafka through Flink to the whole link of ClickHouse with end-to-end Exactly Once semantics without duplication or loss of data.
2.2.3 Platform construction
In order to better manage the real-time computing tasks of BIGO big data platform, the company has built BIGO real-time computing platform Bigoflow to provide users with unified Flink real-time task access. The platform construction is as follows:
- Support Flink JAR, SQL, Python and other types of jobs; Support different Flink versions, covering most of the real-time computing related business within the company;
- One-stop management: integrated job development, submission, operation, history display, monitoring, alarm, easy to check the operation status and find problems at any time;
- Blood relationship: easy to query the data source, data purpose, data calculation context of each operation.
Three, application scenarios
3.1 Application Scenarios of the Onesql OLAP Analysis Platform
The Onesql OLAP analytics platform is used in the following scenarios within the company: for AdHoc queries:
The SQL submitted by the user on the Hue page is forwarded to the Flink SQL Gateway on the OneSQL back end and submitted to the Flink Session cluster for query. Flink SQL Gateway Obtain the execution progress of a query task. The system returns the query result to the Hue page.
3.2 Application scenario of real-time data Warehouse
Real-time data warehouse application scenarios are mainly ABTest services, as follows:
The original behavior log data of users are aggregated by Flink task to generate user detail data. Then, it is joined with dimension table data to flow dimension table and output to ClickHouse to generate multidimensional detail wide table. After summarizing according to different dimensions, it is applied to different businesses. By reforming the ABTest service, the generation time of the result indicator of the service is advanced by 8 hours and the resource usage is more than doubled.
4. Future planning
In order to better build the OneSQL OLAP analysis platform and BIGO real-time data warehouse, the real-time computing platform is planned as follows:
- Improve Flink OLAP analysis platform, improve Hive SQL syntax support, and solve the JOIN data skew problem in the calculation process;
- Improve the construction of real-time data warehouse and introduce data lake technology to solve the problem of small retraceable range of task data in real-time data warehouse.
- Based on Flink to create a data computing platform integrated with flow and batch.
FFA 2021 Live Playback & Presentation PDF download