Abstract: This article is about the application and practice of Flink in Meha Tour shared by Zhang Jian, head of real-time calculation in The Big Data Department of Meha Tour. This paper is mainly divided into four parts:
- background
- Real-time platform construction
- Real-time data warehouse and data lake exploration
- Future development and prospects
I. Background introduction
Founded in 2011, Mihayou is committed to providing users with beautiful, exceeding expectations of products and content. The company has successively launched a number of high-quality popular products, including “Bad School 2”, “Bad 3”, “Undecided Event Book”, “Original God”, dynamic desktop software “Artificial desktop” and community product “Mi You Club”, and created diversified products such as animation, comics, music, novels and peripheral products based on original IP. Headquartered in Shanghai, China, and in Singapore, the United States, Canada, Japan, South Korea and other countries and regions of the global layout.
Flink has been playing an important role in the development of meha Tour big data. Since the establishment of the real-time computing platform, Flink, as a real-time computing engine, has gone through several stages of development, and the real-time computing platform is constantly improving. Inside Miha, the real-time computing platform, called Mlink, is mainly Flink and compatible with Spark Streaming tasks. From the initial Flink Jar package task, development to Flink Sql based, constantly reduce the use of the threshold and improve the task development efficiency; From the initial basic Flink task development, it has developed to multi-version management of tasks across regions and cloud vendors, meeting the needs of business development. In the process of development, Mihayou constantly pays attention to the development of the community, and keeps close contact with the community and Aliyun students.
Mlink is a computing platform based on Yarn resource management and supports data warehouse, algorithm, recommendation, risk control, and large screen services. The number of tasks is 1000+, and Sql tasks account for about 80%. The Yarn Vcores used are over 5000 cores and about 10T of memory, in which the peak throughput of a single task is 5 million QPS and the data volume is over 100 billion a day.
Second, real-time platform construction
2.1 Problems encountered
In the process of Flink’s exploration and development, we will encounter some pain points in the use of Flink. We also have some feelings in the process of exploration and practice. To sum up, there are about the following five aspects:
- First, the development cost of Jar task is high, which is too high for students who are not familiar with Flink code. At the same time, Jar task maintenance cost is high, some code logic changes will involve repackaging, upload, online and other actions;
- Second, task management functions are missing. Multi-tenant, historical version tracing, development version and online version management, UDF management and blood relationship management are important contents of real-time platform management.
- The third is Flink engine itself management, which mainly involves multi-FLink version management, task parameter configuration, secondary development of common Connector, multi-resource environment management, etc.
- Fourth, task alarm monitoring and management, task problem diagnosis;
- Fifth, interworking with offline data warehouse, including Hive Catalog management, real-time and offline scheduling dependency management, etc.
The above five problems may be common, so companies will be based on in-house or open source secondary development to meet their task development management needs. In addition to the five problems mentioned above, there are also problems that need to be solved across regions and cloud vendors, mainly including task on-line and submission efficiency, cloud vendors, inconsistent resources and environment, etc.
2.2 Solutions
Real-time platform construction mainly revolves around the above problems. The current real-time platform architecture is as follows:
Figure 1: Multi-cloud multi-environment real-time platform architecture
The front-end controls the switch of the cloud environment. Backend Service is responsible for user rights management, multi-version task management, kinship management, task o&M, task online and offline, task monitoring, and alarm management. Executor Services are responsible for task resolution, task submission execution, task offline, and interaction with various resource managers. The Backend Service communicates with the Executor Service through the Thrift protocol. The Executor Service can be extended in multiple languages. The architecture design mainly solves the problem of cross-region and cross-cloud vendors and decouples task management and task operation.
Figure 2: Mlink platform development page
Figure 3: Mlink platform operation and maintenance page
Figure 4: Mlink platform synchronization task page
Mlink real-time computing platform mainly designed overview, development, resource management, operation and maintenance, data exploration, synchronization task, user management and executive management modules. The development page is mainly user writing tasks and parameter configuration, including historical version management and other content. Resource management is mainly Jar package tasks and UDF management. Operation and maintenance (O&M) includes task starting and stopping, task running monitoring, and task alarm configuration. For example, Kafka Topic supports previewing data by partition, by time, or by Offset. Synchronization tasks are mainly used to facilitate the management of synchronization tasks, such as CDC to Iceberg one-click synchronization and operation management. Executors are responsible for o&M of executors, including bringing them online and offline, and monitoring their health status.
2.3 Challenges encountered
During platform construction and iteration, we encountered a number of challenges and some good practices. There are four main aspects to share.
The first is the Executor Service development and maintenance aspect.
Executor is primarily involved in the Jar and Sql task resolution commit part. In order to solve the problem of cross-region transmission efficiency, especially the transmission of large JAR packages, the back end performs task resolution, and finally transmits the Job Graph to Executor, who submits it through the resource manager Api. This problem is caused by the inconsistency of the back end parsing environment. Some tasks are resolved by actions, especially those involving Hive and Iceberg tables. Finally, the Executor parses instead of executing at the back end. The Executor has been running for a long time in the OOM metaspace. This is mainly because executors constantly load tasks requiring Class classes, resulting in an increase in the amount of meta-space memory used. This is done by unloading the classloader and heap GC Settings after the task is resolved.
The second aspect is monitoring.
The monitoring scheme is Influxdb plus Grafana. As the number of tasks increases, the number of Series stored in Influxdb exceeds one million, affecting the stability of monitoring and query and resulting in slow response. First, expand the Influxdb. The executing end allocates task metrics to different Influxdb based on the consistent hash scheme. The Flink task reporting Metric itself is streamlined to a certain extent. Secondly in the monitoring, such as Kafka consumption monitoring, is currently supporting the number of consumption delay monitoring, custom Kafka consumption delay time monitoring, mainly is the collection of Kafka the slowest parallelism consumption time, can reflect the maximum delay Kafka consumption time, Data that reflects a point in time must have been consumed.
Figure 5: Grafana monitoring example
The third is the secondary development of Connector.
Iterate on the basis of CDC 1.0, support Mysql to dynamically expand fields and start consuming sites, collected library tables, loci and other Schema information based on time. CDC 2.0 adds full read library table flow control and full initialization without requiring MySQL to enable Binlog. The synchronization of multiple CDC instances may cause pressure on upstream Mysql. Kafka is adopted as data transfer, and the primary Key field of library table is used as Topic Key to ensure the order of Binlog and avoid data disorder in downstream.
Iceberg, as a data lake scheme, is mainly transformed above the support of Iceberg V2 table, that is, the Upsert table. The establishment of Iceberg management center will periodically optimize and clean up according to the merger strategy. Flink writes mainly ensure the sequence of CDC to Iceberg V2 tables. In terms of how to reduce Delete files, BloomFilter is added to Iceberg writes. Can significantly reduce Delete File size. Iceberg Management center supports V2 table merging and Flink submission conflicts.
For Clickhouse, the Clickhouse write code has been refactored to optimize Clickhouse write performance and support local tables and distributed table writes.
The fourth is the data into the lake and off-line scheduling.
Real-time platform integrates Iceberg, and supports Iceberg Hadoop, Hive, Oss, S3 Catalog. The link to the lake from CDC to Iceberg has been put into use by the production business of the department. In the process of data entering into the lake or warehouse, if the downstream table is used by the offline warehouse, there will be a dependency scheduling problem. When will the offline task start? At present, we mainly calculate the delay time and Checkpoint time of the task to ensure that the data has been stored in the lake. Take CDC or Kafka to Iceberg. Kafka collects the delay time of the slowest parallelism, and collects the Checkpoint time of the task. Now that Checkpoint is complete, Iceberg’s version does not necessarily update, so Iceberg writes have been revamped. In such a synchronization task, if the CDC acquisition is not delayed and Checkpoint has been completed, the data at a certain hour can be guaranteed to have been stored. The real-time platform provides a task delay query interface. Offline scheduling this interface is the scheduling dependent node. This ensures the integrity of the stored data when the offline task is started.
3. Real-time data warehouse and data lake exploration
Real-time data acquisition, currently mainly consists of three links:
- The first is the log type, which is mainly collected by Filebeat and written to Kafka. Es is used as the monitoring of Filebeat.
- Second, Api interface reporting service, back-end access Kafka;
- Third, CDC collects full and incremental Mysql data and writes it to Kafka or directly to Iceberg. Canal was previously used as an incremental acquisition scheme, but now it has all been changed to CDC.
The real-time data warehouse architecture design is basically the same as the industry, including ODS, DWD, DWS layer, and then output to various application systems, such as Clickhouse, Doris, Mysql, Redis, etc. At present, Kafka is mainly used as the intermediate bearer, and Iceberg is also being explored as the middle layer. Although Iceberg has the function of stream reading, there has been no good solution to the sequence of data in stream reading, and we are also in the process of exploration. There are two main lines of inquiry:
- Firstly, Kafka and Iceberg are regarded as mixed Source schemes. After reading the mixed Source, Flink task determines the reading range and switching point based on the Kafka sites recorded by Iceberg snapshot.
- Secondly, Flip-188 community introduced Dynamic Table storage implementation. The Flink built-in table consists of two parts, LogStore and FileStore. LogStore will meet the needs of the messaging system, while FileStore is a columnar file system. At each point in time, LogStore and FileStore store exactly the same data (LogStore has TTL) for the most recently written data, but with different physical layouts.
In the aspect of real-time warehouse exploration, the CDC mission to Iceberg lake has been used in production. Four main problems have been solved:
- First, CDC collection, especially multi-database and multi-table collection, will collect Kafka centrally, reducing the impact of multiple CDC tasks on the same database.
- Second, Iceberg supports V2 table writing, including index filtering to reduce Delete files, and Iceberg management center merges and submits conflicts.
- Third, it supports data verification and data delay check of sub-database and sub-table.
- Fourth, one-click task generation. For users, only the database related information needs to be filled in. The target is Iceberg table library name and table name, and Kafka relay is supported to avoid multiple CDC instances collecting the same database instance.
Through the solution of the above four problems, minute-level database data can be entered into the lake, data verification and data delay dependence can be achieved, and it is convenient for downstream offline task scheduling to start.
Figure 6: Data into the lake link
Iv. Future development and prospects
There are four main points:
- First, Flink dynamic table storage can be implemented as soon as possible, to achieve real real-time data warehouse and flow table integration;
- Second, Flink task dynamic scaling, active resource adjustment based on task diagnosis, fine-grained resource adjustment;
- Third, Flink’s read and write optimization of batch tasks. Currently, the use of Batch tasks Flink is not as good as Spark. If this can be supplemented in the future, an engine can be operated by streaming batch, and the development cost will be significantly reduced.
- Fourth, better landing promotion of Flink and Data Lake.
Click here for more technical content