Reprinted from the public account NetEase Khao, NetEase Cloud Music started to build a real-time computing platform in 2018, and has penetrated into all cloud music businesses after several years of development. This article is a practice shared by Teacher Dayu. Starting from a daily operation and maintenance problem, it will lead you to understand some work progress and future planning of cloud music real-time computing platform. The main contents are:
- Platform function
- Batch flow together
- The future planning
After the launch of NetEase Cloud Music real-time data warehouse platform, after one and a half years of development, the overall real-time data warehouse has begun to take shape. We have 300+ real-time data warehouse tables, and the number of tasks in operation is 1200+. About 1000 of these tasks were SQL tasks, Kafka’s total export traffic reached 18GB/S, and the total number of users reached 200+.
The growth of data volume and users also brings more and more challenges to the ease of use and stability of data platforms, including Kafka stability, cluster stability, operation and maintenance challenges, and many early technical debt; The growth of business exposes the weakness of infrastructure, and also gives us a lot of experience in platform construction, operation and maintenance.
I. Platform functions
For the overall function of our platform, you can refer to “Cloud Music Real-time data Warehouse Technical Transformation and some future planning”. Here we will mainly introduce some of our latest work:
“My task is delayed. I can’t expand my capacity. Why is that?”
This is a problem we often encounter in daily operation and maintenance work, which is often time-consuming. There are many reasons for this problem, and in order to solve this problem, we have done some work to enhance our operation and maintenance capabilities.
1. I/O indicators are improved
I/O problems are common causes of the above problems, including message read efficiency, dimension table JOIN efficiency, SINK efficiency, and so on. The performance and stability of third-party storage directly affect the stability of real-time tasks. To quickly locate related problems, we add many IO-related Metric indicators.
1.1 Some performance indicators of Kafka consumer side
1.2 Read deserialization indicator
Contains:
- Deserialized RT
- Percentage of deserialization errors
In the Format side, we have developed a set of Format agents, which can report relevant METIRC indicators and ignore error data without modifying the original Format code. Simply add the format.proxy attribute to specify that the proxy class can support different formats for encapsulation.
For example, if format.proxy=magina is specified, the above performance indicators can be reported. If format.proxy=ds is specified, the ds log format can be parsed. If the proxy format is used to parse the Body part of DS, you do not need to develop a separate format for ds.
1.3 Dimension table JOIN related indicators
On the dimension table JOIN side, we added:
- Response time for data queries
- Hit ratio of local cache
- Query the percentage of retries that occur
- Percentage of data on a successful JOIN, etc
1.4 Performance Specifications of Data Writes
- RT of data serialization
- Average response time of data writing to external data sources, etc
To realize the whole set of IO related indicators, we did some common packaging on the top-level interface of Flink Connector and reconstructed the codes of relevant Connector. As long as we implemented the Connector according to our own interface, there was no need to pay attention to the reporting of detailed indicators, and these indicators would be automatically reported.
2. Kafka partition problem
The limitation of Kafka partition is also often the reason why our program performance cannot be extended. For the implementation of Exactly Once, read performance, and read stability, Flink adopts the way of active pull to read Kafka messages, which limits the number of tasks we read Kafka messages. Greatly limiting the expansion ability of our task performance, take the following case as an example:
SET 'table.exec.state.ttl' = '1h'; SET 'table.exec.mini-batch.enabled' = 'true'; SET 'table.exec.mini-batch.allow-latency' = '10s'; SET 'table.exec.mini-batch.size' = '100000'; INSERT INTO music_kudu_online.music_kudu_internal.ads_ab_rtrs_user_metric_hour SELECT from_unixtime(`timestamp`, 'yyyy-MM-dd') as dt, from_unixtime(`timestamp`, 'HH') as `hour`, os, sceneid, parent_exp, `exp`, exp_type, userid, count(1) pv FROM iplay_ods.ods_rtrs_ab_log INNER JOIN abtest_online.abtest.abtest_sence_metric_relation FOR SYSTEM_TIME AS OF user_metric.proctime ON ods_rtrs_ab_log.sceneid = abtest_sence_metric_relation.sceneid GROUP BY From_unixtime (' timestamp ', 'YYYY-MM-DD '), OS, sceneid, parent_exp,' exp ', exp_type, useridCopy the code
This is a real-time full aggregation task. In the original FLINK, this SQL DAG would look something like this:
If we read odS_rtrS_AB_log with 5 partitions, and our SQL task has 7 concurrent partitions, because of the number of Kafka partitions and the optimization of FLINK’s own job chain, Our message reading, dimension table JOIN and MINI BATCH operations are all affected by Kafka partition and cannot be extended. Especially for dimension table JOIN IO operation, the concurrency of tasks seriously affects the performance of the overall program. At this time, I can only expand the number of Kafka partitions to improve performance.
But this operation is very heavy and will most likely interfere with other tasks reading the flow table; To solve this problem, we have made some changes to Kafka’s Connector to allow an additional Shuffle operation through configuration. For example, in the configuration above, we added configuration:
'connector.rebalance.keys' = 'sceneid,parent_exp,userid'
Copy the code
Messages will be hash fragmented according to fields sceneid,parent_exp, userID and so on after being read, which greatly improves the performance scalability of the overall program. In addition, keyBy operation of specified fields can greatly improve the hit ratio of dimension table JOIN cache and improve the performance and efficiency of MINI BATCH.
In addition to the above configuration, we can also add random Rebalance operations, Rescale operations, and parsing behavior to further improve the overall performance of the program. Note that additional Shuffle operations will cause more threads and network overhead. When configuring these operations, we need to pay attention to the load of the machine at the same time. Adding additional Shuffle operation can improve the scalability of the program, but due to the extra network and thread overhead, if the machine itself does not perform well, it may have the opposite effect, and the performance will be worse under the same resource condition. This needs to be configured according to your application and environment.
3. Kafka uses optimization
With the rapid growth of traffic Kafka’s stability is also the main challenges facing the us, including the rack bandwidth of Kafka, the bandwidth problem across the room, Kafka issues such as enlarge shrinks of jitter and Kafka configuration itself, and so on, basically you can the problems we have met, in order to solve the above problem we have done the following work:
3.1 Developing image services to solve bandwidth problems and ensure high-priority tasks
We developed a set of mirroring services through FLINK. We deployed a set of Kafka clusters between different machine room modules. Through the mirroring service, the data of two Sets of Kafak clusters is synchronized.
We use Yarn Label technology to control the machine room where the task is located through the selection of different queues to reduce the consumption of bandwidth across the machine room. In order to facilitate the user to switch different Kafka clusters, we have also made some changes in the Flink flow table side to support a flow table to mount multiple Kafka clusters at the same time. Kafka cluster can be switched freely through simple configuration. After a round of task sorting and switching, Kafka bandwidth usage has been greatly improved:
3.2 Perfect Kafka monitoring
In our daily work, we found that many developers did not know much about Kafka itself, and the operation and maintenance were not so strict in the early stage of Kafka because of the lack of experience, leading to many problems in the use of Kafka. So we integrated the data of Kafka monitoring service inside music, combined with the mission blood of our platform, and developed our own Kafka monitoring service.
In addition to the relationship between Kafka, flow tables, and tasks, we also actively monitor the following situations:
- The rationality of the number of partitions in Kafka Topic mainly monitors the situation that the number of partitions in the message queue is too little or too much, mainly too little, to prevent the problem that the downstream task processing performance can not keep up because the number of partitions is too small;
- Kafka partition data production balance problem: prevent because Kafka partition data imbalance leads to the downstream task processing performance problem;
- Kafka partition data consumption balance problem: prevent because Kafka partition changes, and downstream tasks because not enabled partition awareness, resulting in some data consumption problems;
- Traffic surge and drop alarm: critical queue traffic alarm to ensure the quality of real-time data.
Kafka version upgrade: in order to solve the stability of Kafka expansion itself, resource isolation problems, through our music public technology team, in Kafka 2.x version based on some secondary development work, the Kafka service as a whole to do platform support, support the Topic smooth expansion capacity, support resource isolation.
LAEBL technology similar to YARN supports machine division of different regions for different topics, complete message mirroring service, and supports offset replication. Unified Kafka operation and maintenance monitoring platform, this part of the future article will be described in detail.
3.3 Construction of zonal flow meter technology
After the real-time data store went live, we found that the stability of the application and the ease of use of the flow table were significantly affected by the following situations:
-
Many times we just need a flow of 1% of the data in the table, but because there is no way to read on demand, so we have to consume a lot of resources to parse read another 99% of the data, lead to a lot of bandwidth resources consumption, waste a lot of resources, and itself SQL development approach itself has no way to on-demand parse log, As a result, we must parse every message in its entirety, which leads to further computing resource consumption.
-
When we split large topics into many small topics according to experience and business, one table became many small tables, and users had to have a lot of experience knowledge to understand what messages were contained in these small tables with the same schema. The usability was poor, and such a design did not conform to the overall design logic of the data warehouse. In the future, if you want to do unified metadata for batch flow tables, the overall situation becomes less likely
In offline scenario, we have many methods to solve the above problems and reduce unnecessary IO, such as data buckets, storage of ordered data, Parquet’s ability to push down query, partition table and other methods can solve the above problems. But the real-time table Case in the existing public scheme does not seem to have any good method; In order to solve the above problems, we developed a flow table partition scheme, the overall and HIVE table partition idea is similar:
We use the interface of SupportsFilterPushDown provided by Flink Table Souce to implement a set of real-time flow Table partitioning scheme of our own. One partition corresponds to one topic. Unnecessary partitions are pushed and filtered under the query condition of users, thus reducing unnecessary data reading. At present, the first version has been launched, and the cloud music exposure log has been preliminarily split. Besides, AVRO data format has been tried to replace the previous JSON format. After practice, the optimization effect is obvious:
-
Almost all AVRO formats deliver a bandwidth optimization of at least 30+%, with message parsing performance double that of raw log formats for music.
-
Using partitioned flow table, we initially migrated 4 exposure log consumption tasks and saved 7 physical machines, saving computing and bandwidth resources by more than 75% on average.
Although these are extreme cases, we can expect from these examples that partitioned flow table technology will be a qualitative change optimization when it is fully rolled out.
Two, batch flow in one body
Real-time data has always been a big goal of our cloud music data platform team’s data warehouse construction. Behind this goal, we cannot get rid of a “noun”, “concept”, “technology” or a “product”. Before we officially start to share our work, let me first share with you the conversation I once met algorithm student in the elevator:
Algorithms: When will your batch stream go live? Are we waiting to use it?
Me: What are your current demands?
Algorithms: Many of our real-time metrics are developed by ourselves, so we can’t directly use off-the-shelf data when offline.
From this conversation, we can see that algorithm students do not want any batch streaming technology, they want real-time, ready and available warehouse data to improve their development efficiency. Behind batch streaming, what are the demands of different roles in the business side?
For operations, products, bosses, analysts:
They want to see accurate, real-time, analyzable reporting data, and the key is analyzability. When abnormal fluctuations occur in the result data, we need to have real-time detailed data to provide analysis queries to investigate the cause of abnormal fluctuations. When the boss has some new ideas and wants to do a second analysis of the existing report, we need to be able to provide detailed and analyzable data to do the analysis and give the results.
In terms of real-time daily activity statistics, the commonly used method is to remove or approximately remove the user ID stored in Redis and KV storage, and then calculate the real-time daily activity data. However, when the daily activity fluctuates abnormally, because the data of Reids is not analyzable. Therefore, it is difficult for us to give the reason quickly, and we cannot do the analysis on the same day. This kind of plan and result are obviously unqualified.
For warehouse development:
- Unified real-time/offline data warehouse metadata management, unified model, unified storage, reduce the cost of data warehouse operation and maintenance construction, improve the overall data warehouse ease of use;
- Unified development code, unified set of SQL to solve offline/real-time development problems, reduce development operation and maintenance costs, and completely solve the problem of large differences in real-time offline data results caused by different business understanding and logic.
For algorithm students:
Real-time/offline unified data warehouse table can be used to unify the model, reduce the threshold of business understanding, improve the usability of the overall data warehouse data, convenient data warehouse metadata management service, facilitate the algorithm students to carry out secondary feature development, improve the development efficiency of the model. Provide accurate, real-time and analyzable algorithm model effect data to improve the efficiency of algorithm model iteration
As a whole, the objective of batch flow integration mainly includes three aspects:
- Unified code: a set of SQL to complete the real-time and offline related business development requirements;
- Unified data warehouse metadata: a single table can provide offline read and real-time read, unified model of batch data warehouse;
- Real-time report data: Unlike unified warehouse metadata, where product report data is required to provide the ability to query results in real time at the second level, unified warehouse data is usually only stored in real time and is not as sensitive to OLAP query efficiency as report data.
1. Uniform code
Because real-time SQL itself is not particularly mature, much of the logic that is easy to implement in offline scenarios is either not implemented or unstable in real-time scenarios.
Are still in the exploration of the current industry, at present the main way is to use ali FLINK a real-time offline unified SQL engine to solve the problem, but also are in practice at present, in the upper layer ADS business logic implementation by several positions at the bottom of the building block out some real-time ability of SQL, the product report development on a unified SQL. This is also the direction we can try in the future. In addition to trying to unify SQL in the upper report development, we have also done some work and planning in the area of unified code:
-
Unified UDF, integrated upgrade platform framework to FLINK1.12 new version, unified offline real-time unified UDF;
-
Unified Metadata Management: In the side of FlinkSQL, we inherit metadata center service and provide data reading and writing methods like catalog.db.table. In order to unify metadata, we also make secondary encapsulation of SparkSQL and integration with metadata center. It realizes reading and writing between heterogeneous data sources in the form of catalog.db.table.
The unified implementation of scenario-oriented configuration batch integration. For some simple business logic scenarios, we will develop scenario-oriented batch integration later. For example, batch indexing task, batch ETL cleaning platform and so on, which is still in planning due to resource problems.
Batch flow one SQL unified under the current technology, and a larger is the premise of the complexity of the log itself, this involves the log itself is buried point standardization and integrity, real-time computing, unlike offline, can be a lot of attribution of logic, the associated logic in the data processing, rationality and cost issues aside, a lot of work in offline scenarios can be done.
However, in the real-time scenario, performance and stability are very sensitive. If a lot of logic is processed on the data side, it will bring a lot of problems that cannot be implemented, the implementation cost is high, a lot of stability, and data delay problems. If the dosing is not good, the construction of the whole real-time data warehouse is a problem, so Cloud music also started the Dawn dosing project and several team cooperation, completely reconstruct the realization of cloud music products dosing, improve and improve the standardization and accuracy of dosing, reduce the development cost of real-time data warehouse.
2. Unify warehouse metadata
There are two main types of solutions in the industry:
-
Construction of the first is a group of flow mapping layer solution, the current ali open plan, this suit has several real-time warehouse and offline for warehouse of old products, in the case of not change the original number of warehouse, build a unified view mapping layer, through the view of experience, the principle of overall reference below:
-
The second solution is to build a new metadata system, at the same time under a schema to mount a variety of storage, such as the HDFS, Kafka, etc., to write to write data at the same time, it reads the scenarios, depending on the way of reading, select the appropriate storage, at present number of netease sail for product development team, the Arctic using this scheme:
The overall idea is to encapsulate icberg, Kafka, Hbase and other kinds of storage, and use different storage in different scenarios. In addition, Arctic has also done a lot of secondary development on the basis of ICEBERG to solve the problem of DWS data update. It provides functions like Hudi’s CopyOnWrite and MergeOnRead to solve the stability problems of Flink itself for full aggregation. At present, cloud Music has been tried out in some new business scenarios, and dozens of batch streaming tables have been launched. If you want to know more about Arctic, you can refer to the Real-Time computing team of NetEase Sufan Meishu for more details.
3. Real-time report data
Real-time report data is mainly dependent on OLAP engine and storage. The storage side needs to provide real-time data update capability as well as second-level data query capability. In most cases, it is impossible to write the results directly to the storage. Because the data report itself has many flexible queries, if the results are directly written into the storage, real-time Cube ability like Kylin is needed, which puts too much pressure on the development and Flink’s own calculation, and will also bring a lot of resources and storage waste, stability problems and development workload problems. Secondary analysis of data is also limited; Therefore, at this layer, we need the OLAP engine to provide the ability to query at the second delay of at least ten billion levels of data. At present, our main solution uses Kudu and Clickhouse storage. Taking our old version of ABTest as an example, we adopt the following solution:
For the real-time results of the latest hour dimension and day dimension, we read Kudu data in time through Impala to associate the latest results; Spark is used to calculate and store the historical day-dimension data of one day ago or hour-dimension data of two hours ago in the result table. The two pieces of data are provided to users together to ensure the timeliness of data results and the overall user experience of data query.
Iii. Future planning
** Improvement of operation and maintenance tools
**
Real-time SQL development reduces the difficulty of the development of real time data statistics, greatly reducing the threshold of the real-time data statistics, on the one hand because of the real-time SQL itself is not mature and black box, on the other hand, many students with offline experience SQL or SQL database like MYSQL to experience to develop the real-time tasks, the operational platform to bring very great pressure, Therefore, the construction of operation and maintenance tools and the improvement of task real-time indicators are one of our main thinking directions in the future.
Partitioned flow table technology perfect
Partitioned flow table technology is a technology that can bring qualitative changes to the resource usage, Kafka pressure and data warehouse construction of cloud music real-time platform. At present, we have only completed a first version, and in the future, we will continue to improve the dynamic perception of partition, partition modification, schema modification, as well as operation and maintenance monitoring and promotion.
The construction of batch and flow in one scene
Such as batch integrated index task construction, batch integrated ETL tools, unified log cleaning rules, for batch integrated data warehouse to lay a good foundation.
Batch stream integrated storage exploration
- Investigate the current solutions of the industry, provide a complete set of solutions combined with the business scenarios of music, reduce the development threshold of real-time reports, and improve the development efficiency of real-time reports;
- Batch flow integrated logic layer construction, etc.
In the end, attached is a real-time computing solution architecture diagram of NetEase Fanlai team, a high-performance, one-stop real-time big data processing solution based on Apache Flink, which is widely applicable to streaming data processing scenarios.
For more technical problems related to Flink, you can scan the code to join the community nail nail exchange group for the first time to obtain the latest technical articles and community dynamics, please pay attention to the public number ~