Meituan-dianping is based on Flink’s real-time warehouse construction practice
The introduction
In recent years, the demand for real-time data service is increasing. This paper summarizes the performance characteristics and application scenarios of common real-time data components, and introduces how Meituan constructs real-time data warehouse through Flink engine to provide efficient and robust real-time data services. In our previous post on The Meituan tech blog, “Flink vs. Storm”, we compared the performance of the two engines. This paper mainly describes the experience of using Flink in actual data production.
Real-time platform initial architecture
In the early stage of real-time data system construction, a complete data system cannot be formed due to the small demand for real-time data. We adopted the development mode of “all the way to the end” : Storm job was deployed on the real-time computing platform to process the real-time data queue to extract data indicators and push them directly to the real-time application service.
However, new challenges arise as products and business people increasingly demand real-time data.
- With more and more data metrics, smokestack development leads to serious code coupling problems.
- There are more and more requirements, some for detailed data, some for OLAP analysis. A single development model cannot meet multiple requirements.
- Lack of a comprehensive monitoring system to detect and fix problems before they affect the business.
Construction of real-time data warehouse
In order to solve the above problems, based on the experience of offline data production, we choose to use hierarchical design scheme to build real-time data warehouse, and its hierarchical architecture is shown in the figure below:
The scheme consists of the following four layers:
- ODS layer: Binlog and traffic log and real-time queue of each service.
- Data detail layer: the business field integrates and extracts the fact data, and constructs the real-time dimension data with offline full and real-time change data.
- Data summary layer: use wide table model to supplement dimension data for detailed data and summarize common indicators.
- App layer: An application layer built for specific requirements that provides services externally through RPC framework.
Through multi-layer design we can precipitate the process of processing data in each layer. For example, in the data detail layer unified data filtering, cleaning, specification, desensitization process; Processing common multidimensional index summary data in the data summary layer. Improved code reuse rate and overall production efficiency. At the same time, the tasks at all levels are similar, so a unified technical solution can be adopted to optimize the performance and make the technical architecture of the data warehouse more concise.
Technology selection
1. Storage engine research
Real-time data warehouse is different from offline data warehouse in the design of the same storage scheme at all levels, such as all stored in Hive, DB strategy. Firstly, for the intermediate process table, the structured data is stored through message queue and high-speed KV storage. Real-time computing engines can perform real-time calculations by listening to messages consume data in message queues. Data stored on high-speed KV can be used for fast correlation calculation, such as dimensional data. Secondly, at the application layer, the storage solution is configured to write data directly according to the data usage characteristics. It avoids the processing delay caused by the data synchronization process at the offline data warehouse application layer. In order to meet the needs of different types of real-time data and reasonably design storage schemes at all levels, we investigated several storage schemes widely used in Meituan.
plan | advantage | disadvantage |
---|---|---|
MySQL | 1. With complete transaction function, data can be updated. 2. Support SQL, low development cost. | 1. The cost of horizontal expansion is high, and storage is easy to become a bottleneck; 2. The update and query frequency of real-time data is very high, with 1000+ QPS for single online real-time application requests; MySQL is too expensive to use. |
Elasticsearch | 1. Large throughput, single machine can support 2500+ QPS, and the cluster can quickly expand horizontally. 2. The response speed of Term query is very fast, and the query delay is less than 20 ms when a single machine is 2000+ QPS. | 1. Without native SQL support, query DSL has a certain threshold to learn; 2. Performance deteriorates significantly during aggregation calculation. |
Druid | 1. Support large amount of data. When obtaining real-time data through Kafka, a single operation can support 6W+ QPS; 2. During data import, you can summarize the data by using predictive calculation to reduce data storage. Improve the efficiency of actual data processing; 3. There are many open source OLAP analysis frameworks. Implementation such as Superset. | 1. Detailed query cannot be supported due to pre-aggregation. 2. The Join operation is not supported. 3. Append-only does not support data modification. This can only be done by Segment. |
Cellar | 1. Support large amount of data, using the architecture of memory and distributed storage, high cost performance storage; 2. Good throughput performance, the test processing 3W+ QPS read and write request, the average delay is about 1ms; Supports a maximum of 10W+ QPS through asynchronous read and write lines. | 1. The interface only supports KV, Map, List and atomic addition and subtraction, etc. 2. A single Key Value cannot exceed 1KB. When the Value Value exceeds 100KB, the performance deteriorates significantly. |
According to different business scenarios, the storage schemes used at each model level of real-time data warehouse are as follows:
- Data detail layer For dimension data part of the scene associated frequency can reach 10W + TPS, we choose Cellar (Meituan internal storage system) as the storage, packaging dimension service for real-time data warehouse to provide dimension data.
- The data summary layer uses the same scheme as dimension data as Cellar to store the data that needs to be associated with historical data for general summary indicators, and carries out associated operations in the way of service.
- Data application layer The design of the application layer is relatively complex. After comparing several different storage solutions, We formulated the judgment basis of data read and write frequency 1000 QPS as the demarcation. For real-time applications where the average read and write frequency is higher than 1000 QPS but the query is not too complex, such as real-time business data of merchants. Cellar provides real-time data service for storage. Elasticsearch is a better store for applications that have complex queries and require detailed lists. And some query frequency is low, for example some internal operation data. Druid builds indexes by processing messages in real time and provides real-time OLAP analysis capabilities quickly through pre-aggregation. For real-time transformation of some historical data products, MySQL storage can also be used to facilitate product iteration.
2. Research on computing engines
In the early stage of real-time platform construction, we used Storm engine for real-time data processing. The Storm engine, though, is good in terms of flexibility and performance. However, because API is too low-level, some common data operations need to be implemented in the process of data development. Table associations, aggregations, etc., create a lot of additional development work, not only introduce a lot of external dependencies such as caching, but also the actual performance is not very good. The data object Tuple in Storm also supports very simple functions, and usually needs to be converted to Java objects for processing. Such a code-defined data model is usually maintained only through documentation. Not only does it require additional maintenance work, but it is also cumbersome to increment and change fields. All in all, it is difficult to build a real-time warehouse using Storm engine. We need a new real-time processing solution that can:
- Provides advanced apis that support common data operations such as associative aggregation, preferably SQL support.
- With state management and automatic support persistence solution, reduce the dependence on storage.
- Easy access to metadata services, avoiding managing data structures through code.
- At least the same processing performance as Storm.
We conducted a technical survey of major real-time computing engines. The characteristics of various engines are summarized as follows:
Project/Engine | Storm | Flink | spark-treaming |
---|---|---|---|
API | Flexible underlying API and the Trident API with transaction assurance | Streaming API and more suitable for data development Table API and Flink SQL support | The Structured-Streaming API can also use Spark SQL, which is more suitable for data development |
Fault-tolerant mechanism | ACK mechanism | State Distributed snapshot save point | RDD savepoint |
State management | Trident State State management | The Key State and Operator State can be used to support multiple persistence schemes | There are apis such as UpdateStateByKey for stateful changes, and various persistence schemes are supported |
Processing mode | Single stream processing | Single stream processing | Mic batch processing |
delay | millisecond | millisecond | Second level |
Semantic security | At Least Once, Exactly Once | Exactly Once, At Least Once | At Least Once |
According to the research results, Flink and Spark Streaming apis, fault tolerance mechanism and state persistence mechanism can solve some of the problems we currently encounter with Storm. But Flink is closer to Storm in terms of data latency and has minimal impact on existing apps. And in internal company tests, Flink’s throughput performance is about 10 times better than Storm’s. After comprehensive consideration, we choose Flink engine as the development engine of real-time data warehouse.
More notable is Flink’s Table abstraction and SQL support. Structured data can also be processed using the Strom engine, though. But it’s still a message-based PROCESSING API, and you can’t fully enjoy the convenience of manipulating structured data at the code level. Flink not only supports a large number of commonly used SQL statements, but basically covers our development scenario. Tables of Flink can be managed by TableSchema, supporting rich data types, data structures, and data sources. It can easily be combined with an existing metadata management system or configuration management system. The following chart clearly shows the differences between Storm and Flink during the development process.
Handling logic and implementation in Storm development requires code that is hardwired into Bolt. Flink can be developed through SQL, the code is more readable, the implementation of logic by open source framework to ensure reliable and efficient, optimization of specific scenarios as long as the modification of Flink SQL optimizer function can be achieved, without affecting the logic code. This allows us to focus more on data development rather than logic implementation. When we need to unify the caliber of offline data and real-time data, we only need to modify the SQL script of offline caliber slightly, which greatly improves the development efficiency. Meanwhile, the data models used by Flink and Storm in the figure are compared. Storm needs to define data structures through a Java Class, while Flink Table can be defined through metadata. It can be well combined with metadata and data governance systems in data development to improve development efficiency.
Flink use experience
In the process of using Flink-Table to build real-time data warehouse. We summarize some common operations of data warehouse construction, such as dimension expansion of data index, data association by topic, and data aggregation operation through Flink.
1. Dimension expansion
For dimension expansion of data index, we adopt dimension information acquisition through dimension service. But the typical response latency for Cellar based dimension services can be less than 1ms. However, in order to further optimize Flink throughput, we use asynchronous interface access to all dimension data associations, avoiding the use of RPC calls to affect data throughput. For some streams with a large amount of data, for example, the volume of traffic log data is on the order of 10W/SEC. During UDF association, the built-in cache mechanism can eliminate the cache according to the hit ratio and time, and partition with the associated Key value, which significantly reduces the number of requests to external services and effectively reduces the processing delay and pressure on the external system.
2. Data association
Data topic merge is essentially the association of multiple data sources, simply referred to as Join operation. Flink’s Table is based on the concept of infinite flow. When a Join operation is performed, two complete tables cannot be associated like offline data. The scheme of data association in window time is adopted, which is equivalent to intercepting data from two data flows for a period of time respectively for Join operation. Similar to offline data being associated by limiting partitions. It is also important to note that Flink must have at least one “equal” association condition when associating tables, because the values on both sides of the equal sign will be used for grouping. Since Flink caches all the data in the window for association, the amount of cached data is proportional to the size of the associated window. Therefore, Flink’s relational query is more suitable for dealing with some scenarios where the time range of associated data can be limited by business rules. For example, the browsing logs within 30 minutes before the purchase of the associated order. A large window not only consumes more memory, but also generates a larger Checkpoint, resulting in reduced throughput or Checkpoint timeout. In actual production, you can use RocksDB and enable incremental savepoint mode to reduce the impact of Checkpoint process on throughput. For some scenarios that require a long association window period, for example, the associated data may be several days old. We can think of these historical data as a fixed “dimension”. The historical data that needs to be associated can be stored in the same way as dimension data: “cache + offline” data and associated by interface. In addition, it should be noted that Flink directly links the multi-table association in sequence, so it should be noted that the association with small result sets should be carried out first.
3. Aggregation operation
When using aggregate operations, Flink supports common aggregate operations such as sum, extremum, mean, and so on. The fly in the face is the support for Distinct. The previous approach adopted by Flink-1.6 was to group the deduplication fields and then aggregate them. For a scenario where multiple fields need to be reaggregated, it is inefficient to only compute them separately and then associate them. For this purpose, we developed a customized UDAF, which realized MapView accurate deweighting, BloomFilter inaccurate deweighting and HyperLogLog ultra low memory deweighting schemes to deal with various real-time deweighting scenarios. However, when using custom UDAF, it takes a lot of time to serialize and deserialize large keys in RocksDBStateBackend. You can use FsStateBackend instead. Another point to note is that Flink framework needs to cache all the data under each grouping window before sorting when calculating analysis functions such as Rank, which consumes a lot of memory. It is recommended that the logic be converted to TopN first in this scenario to see if the requirements can be addressed.
The following figure shows a complete process for producing a real-time data table using the Flink engine:
Real-time counting warehouse results
By replacing the original process with a real-time data warehouse, we abstract the processes in data production into the layers of the real-time data warehouse. It realizes the unification of data sources for all real-time data applications and ensures the consistency of application data indicators and dimensions. In several scenarios where data calibers were changed, we transformed the warehouse details and summaries to complete the calibers switch of all applications without changing the application code at all. In the development process through strict control of data stratification, subject domain division, content organization standard specifications and naming rules. It makes the link of data development clearer and reduces the coupling of code. With the use of Flink SQL for development, code and concise. The amount of code in a single job is reduced from an average of 300+ lines of JAVA code to a few dozen lines of SQL script. The development time of the project is also greatly reduced, and it is not uncommon for one person to develop multiple real-time data indicators in a day.
In addition, we can carry out targeted performance optimization and parameter configuration according to the different characteristics of the work content at all levels of the data warehouse. For example, the ODS layer mainly performs data parsing and filtering operations, without RPC calls and aggregation operations. We optimized the data parsing process to reduce unnecessary JSON field parsing and use more efficient JSON packages. For resource allocation, only 1GB memory can be configured for each CPU. The summary layer mainly carries out aggregation and association operation, which can improve performance and reduce cost by optimizing aggregation algorithm and internal and external storage joint operation. More memory is allocated for resource configuration to avoid memory overflow. Through these optimization methods, although the production link of real-time data warehouse is longer than the original process, the data delay is not significantly increased. At the same time, the computing resources used by real-time data applications are also significantly reduced.
Looking forward to
Our goal is to build the real-time warehouse into a data system that can match the accuracy and consistency of offline warehouse data. To provide timely and reliable data services for merchants, business personnel and meituan users. At the same time, as a unified outlet of real-time data to meals, for other business departments of the group. In the future, we will pay more attention to data reliability and real-time data indicator management. Establish perfect data monitoring, data blood test, cross-check mechanism. Monitor and warn abnormal data or data delay in time. At the same time, optimize the development process and reduce the cost of developing real-time data learning. So that more people who need real-time data can solve problems themselves.
reference
Performance comparison of stream computing framework Flink and Storm
About the author
Wei Lun, the real-time data manager of Meituan In-store Catering Technology Department, joined Meituan in 2017 and has been engaged in the development of data platform, real-time data computing and data architecture for a long time. In the use of Flink real-time data production and improve productivity, some experience and output. Meanwhile, Flink’s practical experience in real-time data processing is also actively promoted.
Recruitment information
Those interested in data engineering and unlocking the value of data as a service can send their resumes to [email protected]. We have many uncharted but interesting areas to explore in real-time data warehousing, real-time data governance, real-time data product development frameworks, and data-driven innovation products for sales and merchants.