Abstract: This paper is compiled from a speech delivered by Di Xingxing, head of Autohome’s real-time computing platform, at the Flink Forward Asia 2021 platform construction session. The main contents include:
- Application scenarios
- Budget resource control
- Flink expansion capacity
- One lake storehouse
- PyFlink practice
- Subsequent planning
Click to view live replay & Presentation PDF
I. Application scenarios
Our application scenarios are similar to those of other companies, covering real-time indicator statistics, monitoring and warning, real-time data processing, real-time user behavior, real-time access to the lake and real-time data transmission:
- The implementation of indicators statistics, including real-time flow analysis, auto show large screen, 818 real-time large screen, can directly support the real-time view of the effect of major activities, convenient and timely adjustment of operation strategy;
- Monitoring and early warning includes the alarm of back-end log analysis of each application, the alarm of performance monitoring of the application, and the alarm of video playback quality monitoring of the C-end user, which is also a typical application scenario of real-time computing. By defining appropriate alarm policies, you can sense core system problems in the first time and quickly locate problems based on real-time data analysis.
- Real-time data processing mainly supports real-time data warehouse construction, content center, e-commerce center and other businesses;
- Real-time user behavior mainly records user portraits and features according to various behaviors of users on the APP. The effect of this part of the application will directly improve user experience eventually. A typical scenario is intelligent recommendation. We will combine the recent content that users are interested in to recommend high-quality resources such as articles and videos for users in real time, so as to improve user experience.
- Real-time access to the lake is our focus in platform construction this year. Our landing lake warehouse integrated architecture is an improvement in many ways over a completely Hive based architecture. At present, it has been implemented in a number of themes, and the improvement effect is also significant;
- Finally, there is the basic real-time data transfer scenario, where users can easily distribute data from a business library or Kafka to multiple storage engines to meet different business requirements, such as distribution to ES to support fast retrieval.
We first used the Storm platform and realized the basic real-time computing development based on the development models of Spout and Bolt. At this stage, the development was completely based on Java coding, with high development threshold and learning cost.
In phase two, we introduced Flink in ’18 and’ 19 and built AutoStream 1.0. At this stage, our main goal is to improve efficiency, reduce development barriers and learning costs. Change the previous pure Java development mode to SQL + UDF based development mode. Since Flink does not support DDL at the early stage, a large part of our work is to build our own Meta Server, define the Source Sink component through DDL, improve the real-time access of business database data, and integrate the storage engine commonly used in the company into the platform. Complete the whole real-time development link open.
In the third phase, we upgraded Flink to version 1.9 and the platform to Version 2.0 of AutoStream, which supports native DDL and self-upload UDF, simplifying the UDF usage process. Meanwhile, as the number of tasks and platform users increases, our daily on Call time also increases. Therefore, we launched a task health scoring mechanism to analyze the health degree of tasks from multiple aspects and help users understand the points that can be optimized for tasks and provide solutions. We have also launched online diagnostics that dynamically modify log levels, view thread stacks and flame charts, improve the efficiency of user problem location, and reduce the daily ON call cost of our platform.
AutoStream 3.0 represents our major work this year, starting with the upgrade of Flink to version 1.12. The most immediate benefit of this upgrade is support for lake Barn integration, Native on K8S, and PyFlink. On the one hand, it can improve the utilization rate of real-time computing resources, and on the other hand, it can further reduce the difficulty of users to optimize task resources.
The image above shows the architecture of AutoStream 2.0, which contains a lot of content covering the overall functionality and positioning of the platform. But inevitably there are plenty of pain points.
Offline storage resources for real-time computing are mixed. The offline Hadoop cluster has separated some servers for real-time computing and deployed a set of Yarn for real-time computing. The disks of these servers support offline data storage, and the CPU memory is mainly used to support Flink tasks. Therefore, Flink computing resources do not monopolize the server, and we do not strictly control the computing resources, which leads to unreasonable resource allocation of many tasks. Usually, excessive CPU resources are applied but the actual utilization is low.
With the gradual progress of the company’s container construction, this year we have supported the way of mixing offline and online departments and allocating resources at different peaks. This means that the CPU memory of Hadoop cluster can not only support Flink real-time computing, but also support the deployment of online services, which highlights the importance and urgency of Flink computing resource management and control.
Next, the most important thing is to promote users to do resource tuning, which is difficult for users. First, there is the cost of understanding the impact of CPU memory and parallelism adjustments on a task, and it often takes a few minutes to modify task resources and restart the task, and the user needs to constantly watch for any impact on the business, such as delays or memory overflow. Simply put, the tuning costs for users are high.
The existing Hive-based data warehouse architecture needs to be upgraded. The timeliness of T +1 or H +1 can no longer meet the needs of many business scenarios, so we finally select Flink and Iceberg to build a real-time integrated data warehouse architecture.
Finally, the ecosystem of real-time computing support is incomplete. Our AI team was primarily Python, and the SQL + UDF approach wasn’t friendly enough for them, so we did PyFlink integration to address this pain point.
Above is an overview of AutoStream 3.0.
Based on the pain points of version 2.0, in addition to functional and application upgrades, we also mainly made the following aspects:
First of all, we strengthened budget control, launched the automatic expansion and capacity function, built and implemented the real-time lake warehouse architecture, and launched PyFlink to support the real-time tasks of Python development. At the same time, based on Flink + StarRocks engine, we further simplify the link of real-time multidimensional analysis.
Second, budget resources control
Why do we need a budget resource control strategy?
First of all, server resources are not divided according to the budget of the team, first-use, first-serve, no upper limit, the utilization rate of task resources is low, and some teams have serious resource waste. At the same time, without the promotion of external forces, most users’ awareness of taking the initiative to optimize resources is very weak.
The first step we took was to use a strong budget control mechanism. Connect with the internal asset cloud system to initialize the team’s available budget, beyond which the task will not start. In principle, the number of tasks with low resource utilization should be controlled within 10%. If the task cannot be optimized, the inter-team budget allocation process can be initiated on the asset cloud system, that is, borrowing resources. If it still fails, the platform whitelists services temporarily.
In the platform specification, we define tasks with low resource utilization, and show the causes and solutions for low utilization.
Currently, we focus on the core rules of CPU usage, memory usage, and free slot to identify underutilized tasks. As early as AutoStream 2.0, we launched the health scoring mechanism for Flink tasks and got rich fine-grained scoring data, so it was easy to identify under-utilized tasks.
We strictly control the amount of computing resources by introducing strong control process, improve users’ awareness of proactive resource optimization by formulating specifications, and reduce users’ tuning cost by developing automatic scaling capacity function. As a result, we didn’t add servers all year long while the real-time computing business grew steadily.
Three, Flink expansion capacity
Why do you need automatic scaling capacity?
- The first is to reduce costs and improve resource utilization;
- Secondly, reduce the cost of resource tuning;
- Finally, reduce the impact of the resource tuning process itself on task stability.
Above, automatic telescopic configuration page, you can specify the triggering time of the automatic telescopic capacity, such as can specify off-peak execution in the middle of the night and reduce the influence of the expansion capacity for business, support concurrent degree of CPU, memory, specified dimension expansion strategy, every scale capacity by nailing and email notification, head of the task, In addition, the system records the scaling capacity trigger reason and the latest resource configuration after the scaling capacity.
The picture above shows the overall design of the automatic scaling capacity function. We have added a new component in JobManager, RescaleCoordinator, which maintains its life cycle using HA and communicates with the Dispatcher. RescaleCoordinator periodically accesses the interface provided by AutoStream. The AutoStream platform determines whether to perform scaling based on the user-defined scaling policy.
The overall process is as follows: RescaleCoordinator periodically checks whether scaling capacity is required after obtaining the leader. If so, it notifies the Dispatcher jobManager to start scaling capacity. The JobMaster requests the TaskManager from Resourcemanager. When all the requested TaskManagers are ready, the jobMaster releases the old TaskManager and reschedules the new TaskManager. Finally, the result is persisted to ZK and HDFS.
Flink already uses ZK and HDFS for HA, so we don’t need to introduce new components. In addition, the new container is applied for in advance, which saves the time of container application and avoids the recovery failure caused by the lack of slot application due to insufficient resources. If you want to scale the parallelism, you need to modify jobGraph’s parallelism before scheduling.
For CPU memory, apply for a Container from ResourceManager and mark the container. The new TaskManager Container requests to the ResultManager through a Slot pool. In this step, you need to maintain a new resource configuration in the slot pool, which corresponds to the two CPU cores and 2GB memory shown in the preceding figure, and requires the rollback mechanism to be supported. If this scaling capacity fails, the resource Settings are rolled back to CPU 1 core and memory 1 gb.
Second, stop the task and delete ExecutionGraph.
Third, release the old TaskManager, rebuild the ExecutionGraph, and restore the task from the savepoint on the marked TaskManager.
Step 4: Persist the resource Settings of the scaling capacity to ZK and HDFS. If JobManager fails here, the previous scaling capacity configurations will be lost. Therefore, you need to save the configurations of the scaling capacity in ZK and HDFS. Data is stored in the BLOCK Server based on HDFS. The ZK stores the key of the Block Server.
Finally, a rough summary of the scaling capacity strategy is given:
- The first is the scale capacity based on parallelism:
- If there is consumption Kafka latency and CPU utilization is low, it is likely to be IO intensive and can increase parallelism.
- If there are idle slots, perform capacity reduction to avoid resource waste.
- The CPU scaling capacity is determined by the CPU usage. The CPU usage is calculated based on the number of CPU cores allocated to the TaskManager process and the actual CPU usage of the TaskManager process.
- The memory dimension determines whether to expand or shrink the capacity based on the memory usage and GC.
Four, lake warehouse integration
Hive based data warehouse mainly has the following pain points:
- The first is timeliness. At present, most of the hive-based data warehouse is T +1, and the data can be queried in the data warehouse at least one hour after it is generated. With the improvement of the company’s overall technical capability, the timeliness of data in many scenarios is increasingly required. For example, quasi-real-time sample data is required to support model training, and quasi-real-time multidimensional analysis is required to help identify the root cause of the decline in click rate.
- Hive 2.0 does not support UPSERT. Data in a service library can only be fully synchronized to t+1, resulting in high data correction costs. Without UPSERT, batch streaming cannot be integrated on the storage layer.
- Finally, Hive schemas are write schemas. Once data is written, the Schema cannot be changed.
After some selection, we decided to build an integrated architecture of lake and warehouse based on Iceberg to solve the pain points of Hive based data warehouse.
Iceberg is an open table that is not bound to a storage or computing engine. At the same time, it can provide incremental snapshot mechanism and easily realize quasi-real-time data writing and reading. Iceberg’s V2 format supports ACID semantics, which can meet the requirements of UPSERT and provide the possibility of batch streaming integration on the storage layer. Read-in schemas are also friendly to schema changes. At present, major query engines are integrated with Iceberg, and both stream and batch are supported on the read and write paths. From the perspective of stream and batch integration, it is also very friendly.
On the right side of the figure is the rationale for the Iceberg incremental snapshot mechanism. Each commit on the table generates a new snapshot, such as snapshot0 (s0 in the figure above) for the first write and S1 for the second write. Each snapshot has a manifest list object that points to multiple manifest files. Each manifest file in turn points to multiple data files, that is, files that store data. The Current Metadata pointer in the figure points to the latest metadata file, the latest snapshot, for each Iceberg table.
The figure above shows the current integration of Iceberg in our internal. At the bottom level, metadata of Hive and Iceberg tables is unified based on Hive Metastore, and storage of Hive and Iceberg tables is unified based on HDFS, which is also the basis of integration of lake and warehouse. Next up is the table style, which Is what Iceberg sees itself as: an open table style that falls somewhere between a storage engine and a computing engine. On top of that are the computing engines, with Flink currently doing real-time in-lake work with Spark and Hive as the main production engines. At the top is the computing platform. Autostream supports real-time entry of click stream and log data into the lake, while AutoDTS supports real-time entry of data from relational database. The offline platform is integrated with Iceberg to support the use of Iceberg as Hive table. Try to avoid adding additional usage costs.
The following are some typical practices in the process of landing the integrated architecture of lake warehouse.
The first is real-time data into the lake.
In the Iceberg scenario, you need to ensure that data with the same primary key is written under the same bucket. Since the DDL of the Flink table does not support the Iceberg bucket definition, the first thing we do is support defining buckets in the Flink DDL’s property.
The second problem is that Iceberg table itself cannot directly reflect the progress of data writing, and offline scheduling is difficult to trigger downstream tasks accurately. Therefore, we use the good watermark mechanism of Flink to directly persistent watermark into metadata of Iceberg table at the stage of entering the lake. In this way, the Iceberg table’s data writing progress can be known by a simple script call, which can trigger the downstream scheduling task precisely.
The third problem is the opening of the real-time lake entry stage and the offline team account system. When Flink writes data to Iceberg, it needs to access HDFS and Hivemetastore, so it must connect with the offline existing team account system. An offline HDFS directory can only assign write permission to one user, so before Iceberg was introduced, all Flink tasks are run through a fixed Hadoop account, which is convenient for us to conduct unified resource management. This includes unified management of the checkpoint directory. By modifying Iceberg’s code for creating Hadoop Filesystem instance, we add the account proxy mechanism to realize the use of user-defined accounts to write data to Iceberg. At the same time, HiveMetaStoreClient is extended to add a proxy mechanism to get through the access to HiveMetaStore.
Next comes usability and stability practices.
In order to unify the integrated metadata of lake warehouse, we insist on reusing the same set of HiveMetaStore service with offline warehouse, during which we also encountered many problems of stability and data correctness.
The first problem is that the access to HiveMetaStore is abnormal. This is because the Kerberos mechanism is enabled in our Hadoop cluster and the transition method of Hive config is misused, overwriting the Kerberos configuration of Hive client. The access to HiveMetaStore is abnormal. We made fixes accordingly and have reported back to the community.
Second, we introduced a zK-based distributed lock to replace the default HiveMetaStore lock. The default is HiveMetaStore distributed lock that controls the concurrent commit of a single table, but there is a situation where the Flink process unexpectedly exits and the code cannot reach the UNLOCK logic, so the table level lock is always occupied and cannot be released. After the Flink task is automatically pulled up, it cannot obtain the lock again. As a result, subsequent data cannot be written. As the number of tasks entering the lake increases, this problem will occur at least once a week. Each time, manual intervention is required to release the lock by manually accessing the HiveMetaStore to restore the Flink task. If the data is not processed in time, the data may be delayed for several hours.
Hive provides a solution to this problem. Set a timeout period for distributed locks. After a timeout period, the locks are automatically released. However, we are based on Hive version 2.0.1, and the overall upgrade and patch pulling costs are relatively high. Therefore, we have reformed Iceberg and replaced the previous lock mechanism with zK-based distributed lock. After the launch, this problem has been fundamentally solved.
There is also the problem of Iceberg table metadata files being deleted by mistake. This problem will lead to the exception that no data files can be found, which directly affects Iceberg table being inaccessible. If the metadatA_location attribute is modified, the metadata.json file will be deleted only if the metadata is not saved successfully.
Next are some improvements to v2 format small file merging.
The principle of Iceberg to realize upsert semantics is that its V2 format is realized by introducing sequence number and combining position delete file and equality Delete file. Position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file = position delete file
For example, suppose there are two transactions:
- After the commit transaction, a datafile will be generated, and the sequence number of the datafile is 1.
- In the second transaction, insert (3), change its value from 300 to 301, and delete (2).
The three SQL statements generate the following behavior: First, generate a data file2 containing the newly written data I (3,300) and the newly written data I (300,301). And write I (3,300) to the equality Delete file. Since data with ID 3 was written in the current transaction, a position DELETE file is also generated and the position corresponding to (3,300) is marked as deleted. Since the data with ID 2 was not written in the current transaction, we append the data with ID 2, namely I (2,200), to the Equality Delete file and mark it as deleted.
The implementation idea of upsert semantic reading path is as follows: Position delete file joins datafile whose sequence number is not greater than its own. Equality delete file joins datafile whose sequence number is smaller than its own. Position delete file will delete I (3,300) from datafile2, equality Delete file will delete I (2,200) from datafile1. Only two data ids 1 and 3 are displayed in the final result.
As the number of DELETE files increases, query performance decreases. To ensure query performance, we perform small file merges on the Iceberg table every hour.
However, before the introduction of sequence number, small file merging for V1 format cannot guarantee the correctness of data in V2 format after merging. So in the process of practice, we made some changes for v2 format small file merge.
The idea of merging small files in V1 format is exactly the same as that of reading. That is, the two delete files are applied to the appropriate datafile. After merging, the references to the old files are deleted and the newly generated datafile is referenced instead. According to the previous definition, the sequence number will increase continuously. Therefore, the sequence number corresponding to the merged datafile will also increase, resulting in v2-format data conflicts in small file merging scenarios.
First of all, in the process of merging small files, modifying sequence number will lead to transaction conflicts written by Flink in real time, which will invalidate the DELETE statement of transaction 3 in the figure above. Transaction 3 will delete the data from primary key 3, but transaction 4 will add the data back after the small file is successfully merged, because the sequence number of this data becomes 4, then the sequence number of delete file is 3. Do not delete data whose sequence number is 4.
In view of the above conflicts, we transform Iceberg’s small file merging. The idea of transformation is that merging small files does not modify the final data in nature, but only optimizes file storage. Therefore, you can reuse the largest sequence number in the merged file.
According to the new thinking, the new sequence number corresponding to the datafile after transaction 4 is 2, that is, the largest sequence number reused in the merged file. Sequence numbe2 is still smaller than the sequence number3 of transaction 3, so the semantically correct delete file can be guaranteed.
The landing of the integrated architecture of lake warehouse has brought us many benefits:
- First of all, the data timeliness of traffic, content and clue subject has been greatly improved, from the previous day/hour level to less than 10 minutes, and the SLA of warehouse core task has been completed two hours in advance.
- Secondly, feature engineering is improved, and the effectiveness of model training is increased from day/hour to 10 minutes without changing the original architecture.
- Finally, the business database data can be configured on our AutoStream platform to realize quasi-real-time lake entry, and the corresponding quasi-real-time analysis and query can also be done through Spark SQL. At the same time, we are also testing the quasi-real-time entry of the results of aggregation tasks into the lake in a small range. The quasi-real-time materialized view based on Iceberg is created through Flink+Iceberg, which can greatly improve the efficiency and experience of analysis.
5. PyFlink practice
PyFlink was introduced primarily because we wanted to export Flink’s powerful real-time computing power to the AI team. Due to the characteristics of the technology itself, most of the developers in the artificial intelligence team are based on Python language development, but Python itself has weak distributed and multithreading support. They need a framework that can get started quickly and has distributed computing capability to simplify their daily program development and maintenance. We also needed to fill in the gaps in Python ecosystem support on our platform, so it was a natural thought to integrate PyFlink into our platform.
The diagram above shows the basic architecture of PyFlink. Python VM and JVM two-way communication architecture, the implementation of python API and Java API mapping, users can write Python code to achieve Flink task development.
By comparison, we chose Kubernetes to deploy the PyFlink environment. In addition to providing better support for resource isolation, Kubernetes can easily integrate the Python environment with other machine learning dependencies.
With PyFlink, we support three main types of dependency management:
- The first is Java’s Jar dependency, which relies on AutoStream’s own file management service to add runtime jars to Flink’s runtime environment.
- The second is the dependency of Python files, which can also be managed uniformly using the File service of the AutoStream platform. When dependencies are configured correctly, tasks are automatically downloaded into the image when submitted.
- Finally, there are Python third-party dependencies, which are automatically downloaded when a user submits a task and installed automatically by PyFlink before starting the Python process.
In the case of Flink native deployment based on K8S, each modification of the user program requires a new image, which is time-consuming and laborious. To make development easier for users and deployment more efficient, we optimized the integration with PyFlink. Depending on the file service provided by the platform, users only need to upload the files they depend on to the file service when starting tasks. Then, users can modify the entry script of the image to start jobManager and TaskManager processes according to the input parameters. Download the required dependencies (i.e., JAR files, Python files, etc.) to the container’s internal directory so that the program can be loaded into the corresponding files when it starts.
We also connected with the AutoStream catalog, which allows Python users to directly reuse tables and UDFs already declared on the platform, or to develop and register Python UDFs themselves. Users can quickly complete the development of business requirements through SQL + UDF.
Take a custom task as an example. When developing PyFlink programs, the Gateway provided by PyFlink can call the built-in Catelog and UDF registration classes to complete the registration of Catelog and UDF, so as to avoid repeated definition and development during development.
Take a look at the PyFlink UDF development example. The user can develop in the original project, keep the original project structure unchanged, and then add new Python code by calling the corresponding processing logic in the Eval method according to the PyFlink UDF development specification.
With the Python UDF, you only need to specify the language as Python when you create the function in the SQL task and add the files required by the Python UDF in the advanced configuration.
We have implemented basic support for the Python ecosystem by integrating PyFlink + AutoStream, addressing the pain point that Python users find difficult to develop real-time tasks. At the same time, they can easily migrate their previously deployed stand-alone applications to the AutoStream platform and enjoy Flink’s powerful distributed computing capabilities.
6. Follow-up planning
In the future, we will continue to optimize computing resources, make the use of computing resources more reasonable, and further reduce costs. On the one hand, make full use of the function of automatic scaling capacity, expand scaling capacity strategy, realize the mixing of real-time off-line computing resources, and further reduce the server cost of real-time computing by using the advantages of real-time off-line staggered computing. At the same time, we try to optimize fine-grained Yarn resource scheduling. For example, if less than one core resource is allocated to JobManager and TaskManager, we perform more fine-grained optimization.
In terms of streaming and batch integration, we plan to make use of Flink’s batch processing capability to test the application of batch processing and Web scenarios in a small range. At the same time, based on the data lake architecture, we continue to explore the possibility of batch streaming integration at the storage level. We’ve also been looking at the Flip-188 proposal, which proposed a new way of unifying stream tables and batch tables to the extent that you can write data to Logstore and Filestore in a single insert. Allow downstream to consume Logstore data in real time to do real time pipelines, or use Filestore’s batch data to do AD_hoc queries. We hope to do something similar in the future.
Click to view live replay & Presentation PDF