Real-time data warehouse to end-to-end low latency, SQL standardization, rapid response to change, data unity as the goal. The best practice concluded by meituan Takeout Data Intelligence team is: a common real-time production platform and a common interactive real-time analysis engine work together to meet both real-time and quasi-real-time business scenarios. The two reasonable division of labor, complement each other, form easy to develop, easy to maintain and high efficiency assembly line, give consideration to development efficiency and production cost, with a better input-output ratio to meet the diversified needs of business.

01 Real-time Scenario

There are a lot of real-time data in the scene of Meituan takeout, mainly in the following aspects:

  • Operation level: such as real-time business change, real-time marketing effect, daily business situation and daily time-sharing business trend analysis, etc.
  • Production level: for example, whether the real-time system is reliable, whether the system is stable, real-time monitoring of the health of the system, etc.
  • C-end users: for example, search and recommendation ranking requires the production of feature variables such as real-time behavior and characteristics to recommend more reasonable content to users.
  • Risk control: Real-time risk identification, anti-fraud, abnormal transactions, etc., are all scenarios where real-time data is widely used.

02 Real-time technology and Architecture

1. Selection of real-time computing technology

At present, there are many open source real-time technologies on the market, including Storm, Spark Streaming and Flink. Technical students should deploy them according to their company’s specific business.

Meituan takeout relies on the overall basic data system construction of Meituan. In terms of technology maturity, the company mainly used Storm in the past few years. At the time, the Storm was also irreplaceable in terms of performance stability, reliability and scalability. However, as Flink becomes more and more mature, it has surpassed Storm in terms of technical performance and framework design advantages. In terms of trend, Storm will be gradually replaced by Flink just as Spark replaced MR. Of course, there will be a process of moving from Storm to Flink, we currently have some old missions running on Storm and are working on the transition.

The comparison between Storm and Flink can be seen in the table above.

2. Real-time architecture

1) Lambda architecture

Lambda is a classic architecture. In the past, there were not many real-time scenes, which were mainly offline. When the real-time scenes were added, the technology ecology was different due to the different timeliness of offline and real-time scenes. Lambda architecture is equivalent to the addition of a real-time production link, an integration at the application level, dual production, independent of each other. In business applications, it becomes a way to be adopted.

Dual-path production will have some problems, such as processing logic Double, development operation and maintenance will also Double, resources will also become two resource links. Because of the above problems, another Kappa architecture was evolved.

(2) the Kappa architecture

In terms of architecture design, Kappa is relatively simple and unified in production, with a set of logic for simultaneous offline and real-time production. However, it has great limitations in practical application scenarios. In the industry, there are few cases that directly use Kappa architecture for production and landing, and the scenarios are relatively single. These problems will also be encountered in Meituan Takeout. We will also have our own thoughts, which will be elaborated in the following chapters.

03 Business Pain points

First of all, in the take-out business, we have encountered some problems and challenges. In the early days of the business, requirements are usually completed Case By Case in order to meet business needs. Business has high requirements for real-time performance. From the perspective of timeliness, there is no opportunity for intermediate layer precipitation. In this scenario, the business logic is usually directly embedded, which is a simple and effective way to think of. This development mode is also common in the early stage of business development.

As shown in the figure above, after getting the data source, we will go through data cleaning, dimension expansion, business logic processing through Storm or Flink, and finally business output directly. When this process is broken down, the data source side will repeatedly refer to the same data source, followed by cleaning, filtering, dimension expansion and other operations, all have to be repeated. The only difference is that the code logic of the business is different. If the business is small, this mode is acceptable. However, when the follow-up business volume increases, the situation of who develops and who operates and maintains will occur. In addition, everyone is applying for resources, which leads to the rapid expansion of resource cost and the failure of intensive and effective use of resources. Therefore, it is necessary to consider how to construct real-time data from the whole.

04 Data Features and Application Scenarios

So how to build real-time data warehouse? First of all, we need to disassemble the data, the scenarios, and the common characteristics of these scenarios. For takeout scenarios, there are two categories: log and business.

  • Log class: data volume is particularly large, semi-structured, nested deep. The characteristics of logging data of a class has a big log flow once the form is not become, through the way of burial site collected all logging platform, unified collection and distribution, is like a tree, the root is very big, when pushed to the front end application, from the root to the branches forked process decomposition (from 1 to n). If all the services look for data from the root, the path seems to be the shortest, but the burden is too heavy, and the data retrieval efficiency is low. Log data is generally used for production monitoring and user behavior analysis. It has high timeliness requirements. The time window is usually 5 minutes or 10 minutes, or the current state expires.
  • Business class: mainly business transaction data. Business systems are generally self-contained and distributed down in the form of Binlog. Business systems are transactional and mainly modeled in a normal form. The main body is very clear, but there are many data tables, which require multiple table association to express the complete business, so it is an N to 1 integrated processing process.

However, real-time processing of business class mainly faces the following difficulties:

  • Multistate of business: business processes change from start to finish, such as ordering -> paying -> shipping, business libraries change on an original basis, and binlogs generate many changes. However, business analysis pays more attention to the final state, which leads to the problem of data rollback calculation. For example, an order is placed at 10 o ‘clock, and the order is cancelled at 13 o ‘clock, but the cancellation order is expected to be deducted at 10 o ‘clock.
  • Business integration: Business analysis data generally cannot be expressed by a single subject, and many tables are often associated to obtain the desired information. Merging and alignment of data in real-time streams often requires large cache processing and complexity.
  • Analysis is batch, processing is streaming: for a single data, analysis cannot be formed, so analysis objects must be batch, and data processing is item by item.

The logging and business-class scenarios generally exist at the same time and are intertwined, and there are problems with a single application, whether Lambda or Kappa. Therefore, it makes more sense to choose architectures and practices for scenarios.

05 Real-time data warehouse architecture design

1. Real-time architecture: Exploration of the combination of stream and batch

Based on the above questions, we have our own thinking. Through the combination of flow batch to respond to different business scenarios.

As shown in the figure above, the ETL process of data collection from logs to message queues and then to data streams is unified as the construction of the underlying data streams. Later, for real-time features of log class, real-time large-screen applications go through real-time streaming computing. For Binlog class business analysis go to real-time OLAP batch processing.

What are the pain points of the streaming analytics business? For the paradigm business, Storm and Flink both require a large amount of external memory to achieve business alignment between data streams, requiring a large amount of computing resources. In addition, due to the limitation of external memory, window limiting strategy must be carried out, and some data may be abandoned eventually. After calculation, it is generally stored in Redis for query support, and KV storage has many limitations in dealing with analysis class query scenarios.

How is real-time OLAP implemented? Is there a kind of real-time computing engine with its own storage, when the real-time data comes, it can flexibly calculate freely within a certain range, and has a certain data carrying capacity, while supporting analysis query response? As technology evolves, MPP engines are evolving rapidly and their performance is improving rapidly, so a new possibility opens up in this scenario. Here we’re using the Doris engine.

This idea has also been practiced in the industry and has become an important direction of exploration. Ali real-time OLAP solution based on ADB.

2. Real-time data warehouse architecture design

From the perspective of the whole real-time data warehouse architecture, the first consideration is how to manage all the real-time data, how to effectively integrate resources, how to construct data.

Methodologically, real-time and offline are very similar. In the early days of offline data warehouse, it was also Case By Case. Only when the data scale increased to a certain amount would we consider how to manage it. Layering is a very effective way of data governance, so in the real-time data warehouse how to manage the problem, the first consideration is also layering processing logic, the specific content is as follows:

  • Data source: At the data source level, offline and real-time data sources are the same. They are divided into logs and service logs. Logs include user logs, DB logs, and server logs.
  • Real-time detail layer: layer in detail, in order to solve the problem of repetitive construction, to build unity, using offline for several warehouse model, detailed data layer, the basis of unified construction shall be carried out in accordance with the theme management, detailed layer is for the purpose of the downstream provides direct data are available, and therefore have to be unified processing of base layer, such as cleaning, filtering, enlarge dimensions, etc.
  • Summary layer: the summary layer can directly calculate the results through the concise operators of Flink or Storm, and form the summary indicator pool. All indicators are processed in the summary layer, and all people manage and build according to the unified norms, and form the reusable summary results.

To sum up, from the point of view of the construction of the whole real-time warehouse, the first level of data construction should be built first, first frame, and then set the specification, each layer of processing to what extent, each layer with what kind of way, when the specification is defined, easy to carry out standardized processing in production. Due to ensure the timeliness, design, level cannot too much, for the high real-time demand scenarios, basic can walk above on the left side of the data flow, demand for batch processing, can from real-time detail layer into the real-time OLAP engines, based on OLAP engine’s own rapid retreat of calculation and query capabilities, pictured above on the right side of the data flow.

06 Construction of real-time platform

After the framework is determined, we will consider how to carry out the construction of the platform. The real-time platform construction is completely attached to the real-time warehouse management.

First function of abstraction, the function of the abstract into components, so that you can achieve standardization production, systematic security, can further construction for cleaning, filtering, confluence of basic processing level, expanding d, transformation, encryption, screening, and other functions can be abstracted, base layer by means of the modular build available data flow directly. This will lead to a problem, in order to satisfy the diverse needs of users, how to accommodate other users, so there may be redundant processing. In terms of storage dimension, real-time data does not store history and will not consume too much storage. This kind of redundancy is acceptable and can improve production efficiency through redundancy. It is an application of the idea of exchanging space for time.

Through the processing of the base layer, all the data are precipitated to the IDL layer and written to the base layer of the OLAP engine at the same time, and then the real-time summary layer is calculated. Based on Storm, Flink or Doris, multi-dimensional summary indicators are produced to form a unified summary layer for unified storage and distribution.

When these functions are available, metadata management, indicator management, data security, SLA, data quality and other system capabilities will gradually be built.

1. Real-time base layer function

The construction of real-time base layer should solve some problems. The first problem is that a stream is read repeatedly, a Binlog is called, it’s in the form of a DB package, the user might only use one of the tables, if everyone wants to use it, there might be a problem that everyone wants to access the stream. The solution is to deconstruct it according to different businesses, restore it to the basic data flow layer, make it into a paradigm structure according to business needs, and carry out integrated theme construction according to the modeling method of data warehouse.

Secondly, components should be encapsulated, such as the basic layer of cleaning, filtering, expansion and other functions, through a very simple expression entrance, so that users will write out the logic. Data conversion is flexible, such as converting from one value to another value. For this kind of custom logic expression, we also open custom components, which can be used to develop custom scripts through Java or Python for data processing.

2. Real-time feature production function

Feature production can be expressed logically through SQL syntax, logical adaptation at the bottom layer, transparent transmission to the computing engine, shielding users’ dependence on the computing engine. As for offline scenarios, large companies rarely develop in code at present, except for special cases, so it can be expressed in SQL.

At the functional level, the idea of index management can be integrated into the atomic index, derived index, standard calculation caliber, dimension selection, window setting and other operations can be configured, so that the unified analysis of production logic, unified packaging.

There is another problem, the same source, write a lot of SQL, each submission will start a data stream, which is a waste of resources, our solution is to achieve the production of dynamic indicators through the same stream, in the case of non-stop service can dynamically add indicators.

Therefore, in the process of real-time platform construction, more consideration is given to how to use resources more effectively and in which links can use resources more economically, which is more considered in engineering.

3. The construction of the SLA

SLA mainly to solve two problems, one is the end-to-end SLA, one is the operation efficiency of the SLA, we adopt the way of buried point + report, due to the real-time flow is large, buried point to be as simple as possible, not buried too many things, can express business, each job output report to the SLA monitoring platform, through the form of a uniform interface, The required information is reported to each operation site, and the end-to-end SLA statistics are collected.

In real-time production, job SLAs are also essential because links are so long that you cannot control all links, but you can control the efficiency of your own jobs.

4. Real-time OLAP solution

The problem

  • Binlog service restoration is complex: The service changes a lot and needs to change at a certain point in time, so it needs to be sorted and the data needs to be stored, which consumes a lot of memory and CPU resources.
  • Complex Binlog business association: In streaming computing, the association between streams is very difficult to express the business logic.

The solution

By OLAP engine with computational power, there is no need to map a stream logically, only need to solve the real-time and stable data entry problem.

We use Doris as a high-performance OLAP engine. Since derivative calculation is needed between results generated by business data, Doris can quickly restore business by using Unique model or aggregation model, and at the same time, aggregation of summary layer can be carried out, which is also designed for reuse. The application layer can be physical or logical.

This mode focuses on the solution of business rollback calculation, such as business state change, need to change the value at a certain point in the history, this scenario with the cost of flow calculation is very high, OLAP mode can solve this problem well.

07 Real-time Application Cases

Finally, a case is illustrated. For example, if a merchant wants to give a discount to the user according to the number of orders in the user’s history, the merchant needs to see how many orders have been placed in the history, and the historical T+1 data should be available, as well as the real-time data today. This scenario is a typical Lambda architecture. We can design a partition table in Doris, one for historical partition and the other for today’s partition. The historical partition can be produced offline, and the indexes of today can be calculated in real time and written into today’s partition, so that a simple summary can be made when querying.

This scenario seems relatively simple, but the difficulty is that many simple problems will become complicated after the volume of merchants comes up. In the future, we will also precipitate more business scenarios through more business inputs and abstract them to form unified production schemes and functions, so as to support diversified business needs with minimal real-time computing resources. This is also what we need to achieve in the future.

Read more technical articles from meituan’s technical team

Front end | | algorithm back-end | | | data security operations | iOS | Android | test

| in the public bar menu dialog reply goodies for [2020], [2019] special purchases, goodies for [2018], [2017] special purchases such as keywords, to view Meituan technology team calendar year essay collection.

| this paper Meituan produced by the technical team, the copyright ownership Meituan. You are welcome to reprint or use the content of this article for non-commercial purposes such as sharing and communication. Please mark “Content reprinted from Meituan Technical team”. This article shall not be reproduced or used commercially without permission. For any commercial activity, please send an email to [email protected] for authorization.