Real – time number warehouse construction purposes

Solve the problem of traditional counting warehouse

Real – time warehouse is a confusing concept. The real-time data warehouse itself seems to be the same as making the black background of a POWERPOINT presentation whiter. Traditionally speaking, we believe that the data warehouse has a very important function, which is to record history. Typically, a warehouse is expected to have data from the first day of operation, and then to the present day.

However, real-time processing technology is also a technology that emphasizes the current processing state, so we believe that when these two opposite solutions overlap, it is not destined to be used to solve a broader problem. Therefore, the purpose of real-time data warehouse construction is to solve the problem that traditional data warehouse cannot solve due to the low timeliness of data.

Because of this characteristic, we have given two principles:

  • Traditional data warehouse can solve the problem, real-time data warehouse is not solved. Last month’s historical statistics, for example, will not be built with real-time data stacks.
  • The problem itself is not very suitable to be solved by the number of storehouse, nor real-time number storehouse solution. For example, business needs, or requirements for timeliness are very high. These requirements are not recommended to be solved by real-time counting.

Of course, in order to make the whole system look like a data warehouse, we set some requirements for ourselves. In fact, this requirement is the same as the requirement of building offline data warehouse. First, real-time data warehouse should be subject oriented, and then be integrated and relatively stable.

The difference between offline data warehouse and real-time data warehouse is that the offline data warehouse stores historical accumulated data, while when we build real-time data warehouse, we only keep the data from the last batch to the current. It’s a mouthful, but it’s actually pretty easy to do.

The usual solution is to keep the data for about three days, because three days of data can provide a stable two days of complete data, thus ensuring that a complete data service can be provided in the interval between yesterday’s data being processed by the batch process.

Application scenario of real-time data warehouse

1 Application scenario of real-time data warehouse

  • Real-time OLAP analysis

OLAP analysis itself is very suitable for a class of problems to be solved by data warehouse. We improve the timeliness of data warehouse through the expansion of real-time data warehouse. It may even be possible to make existing OLAP analysis tools capable of analyzing real-time data without much change at the analysis level.

  • Real-time data Kanban

Such scenarios are easy to accept, such as the real-time large screen scrolling display of core data changes on Tmall Double 11. In fact, for Meituan, there are not only promotional business, but also some major store business. As for store owners, they may also care about their daily sales on various business lines every day.

  • Real-time feature

Real-time features refer to the operation of summary indicators to mark some features of merchants or users. For example, users who buy goods for many times will be judged as high quality users. In addition, the merchant sales draft, the background will think that the merchant’s heat is higher. Then, similar stores or merchants may be prioritized for real-time precision operations.

  • Real-time Service Monitoring

Meituan-dianping will also monitor some core business indicators. For example, some online problems may lead to the decline of some business indicators. We can detect these problems as soon as possible through monitoring, so as to reduce losses.

How to build real-time data warehouse

Real-time data warehouse concept mapping

We help you quickly and clearly understand some concepts of real-time data warehouse through the corresponding relationship table between offline data warehouse development and real-time data warehouse development.

2 wearhouse_mapping

  • programmatically

The most common scenario for offline development is to develop with Hive SQL and then add some extended UDFs. To map to the real-time data warehouse, we will use Flink SQL, also with udF development.

  • Job execution level

The execution level of offline processing is usually MapReduce or Spark Job, corresponding to the real-time data warehouse is a continuous running Flink Streaming program.

  • Number warehouse object level

An offline warehouse is actually using Hive tables. For real-time data store, we use Stream Table to abstract the Table.

  • Physical storage

Offline data warehouse, we will use HDFS for storage in most cases. In real time, we are more likely to use message queues like Kafka for data storage.

The overall architecture of real-time data warehouse

Before this, we have done a share about why Flink is chosen to do real-time data warehouse, which focuses on the reasons and ideas for the selection of technical components. For the specific content, please refer to “Meituan Review of Real-time data Warehouse Construction Practice based on Flink”. The main content of this article is to share around the data itself, the following is our current real-time data warehouse data architecture diagram.

The Meituan review based on the number of real-time Flink warehouse construction practice “tech.meituan.com/2018/10/18/…

3. Real-time data warehouse architecture

From the data architecture diagram, the data architecture of real-time data warehouse has many similarities with that of offline data warehouse. For example, hierarchical structures; For example, the ODS layer, the detail layer, the summary layer, and even the application layer may all have the same naming pattern. However, there are many differences between real-time and offline counting.

The main difference with the offline warehouse is that the real-time warehouse has fewer layers.

According to our current experience of building offline data warehouse, the second layer of data warehouse is far more simple than that. Generally, there are some concepts such as mild summary layer, but the second layer will contain many layers. Another is the application layer, the previous construction of warehouse, the application layer is actually inside the warehouse. After the application layer is built, a synchronization task will be built to synchronize data to the database of the application system.

In the real-time data warehouse, the so-called APP layer application table, in fact, is already in the application system database. In the figure above, although the APP layer is drawn, it’s not really a table in a warehouse, the data is essentially already stored.

Why fewer thematic levels? This is because when processing data in real time, the data must be delayed at every level created.

Why are summary layers built as little as possible? This is because when summarizing statistics, some delays may be artificially created to ensure the accuracy of data in order to tolerate some data delays.

For example, statistics may be collected at 10:00:05 or 10:00:10 to ensure that all data received before 10:00 is collected. Therefore, too many levels of aggregation layer can increase the artificial delay of data.

It is recommended to reduce layers as far as possible, especially the summary layer must be reduced, preferably not more than two layers. Detail layer may be a little more level fortunately, there will be this system detail design concept.

The second big difference is the storage of the data source.

When building an offline warehouse, it is possible that the entire warehouse is built entirely on Hive tables, all running on Hadoop. However, when building a real-time data warehouse, we may even use different ways to store the same table.

For example, most of the detailed data or summary data may be stored in Kafka. However, dimensional data may be stored in kv storage systems such as Tair or HBase. In fact, summary data may also be stored in Kafka. In addition to the overall structure, we also share the main points of each floor.

■ Construction of ODS layer

Data sources should be as unified as possible, and partitions should be used to ensure local order of data

4 ODS

First of all, the first construction point is ODS layer, in fact, ODS layer construction may not necessarily have a relationship with the warehouse, as long as the use of Flink development program, it is necessary to have real-time data sources. The main real-time data source today is message queues, such as Kafka. At present, the data sources we have come into contact with are mainly binlog, traffic log and system log.

There are two main points I want to talk about:

First of all, the first construction point is the ODS layer, in fact, the ODS layer construction may not necessarily have a relationship with the warehouse, as long as you use the Flink development program, you must have this real-time data source. The main real-time data source today is message queues, such as Kafka. At present, the data sources we have come into contact with are mainly binlog, traffic log and system log.

There are two main things I want to talk about here: how can I choose a data source with so many data sources? We believe that from the experience of counting positions:

** First of all, the sources of data sources should be as unified as possible. ** This unification has two implications:

  • The first is that the real-time data source itself needs to be aligned with itself. For example, if you choose to access a certain type of data from a system, you can either access it from the binlog or you can access it from the system log. If you do not know the data production process, part of the data is accessed through binlog and part of the data is accessed through system logs, which may cause data disorder.
  • The second unification, perhaps a little more important, is the unification of real-time and offline. Although we are building a real-time data warehouse, it is still a data warehouse in essence. As a team, the calculation logic of the indicators in the warehouse should be completely consistent with the data source, so that people using the data can not have misunderstanding. If both teams can provide you with the same data, we recommend using the same data sources as offline students. The company itself is also doing some work to ensure that data sources for offline and real-time adoption are consistent.

The second point is the problem of data disorder. When we collect data, there will be a big problem. It may be that the same data, due to the existence of partition, will be consumed after the occurrence of the data, and the state after the occurrence of the data will be consumed before the occurrence of the data. We solved this problem by using a data component within Meituan.

In fact, the main idea of ensuring data order is to use Kafka partitions to ensure that data is locally ordered within partitions. As for the specific operation, you can refer to “Meituan-Dianping real-time Data Warehouse Construction Practice based on Flink”. This is a plan made by the data synchronization department of Meituan, which can provide very rich strategies to ensure that the same data is consumed in order of production, so as to solve the problem of data disorder at the source.

■ Construction of DW floor

Solve the problem of noise, incompleteness and disunity of data form in original data. Form standardized, unified data sources. Keep it offline if possible.

In fact, the construction idea of detail layer is basically the same as that of offline data warehouse, which mainly lies in how to solve the problems of data noise, incompleteness and non-uniform form that may exist in ODS layer data, so that it can be a unified data source that meets the standards in the warehouse. Our advice is that if possible, the best way to get into the warehouse is with the same process as offline.

Especially if the data source is relatively uniform, but the logic of development often changes, in this case, we may actually use a set of configuration based warehousing rules. May be offline classmates have a set of housing system, they knew what data sheet on the configured rules to enter the number of real-time data warehouse, and to which fields of the input, and then the real-time and off-line warehousing is to use the same set of configuration, so we can ensure our offline for warehouse and the number of real-time warehouse layer in DW, long-term to maintain a consistent state.

In fact, the main work of DW layer construction is mainly the following four parts.

5. The DW layer

The only thing marked in red is the normalization of the model. In fact, the normalization of the model is a platitude. It is possible that each team will write out its own normalization before building the data warehouse. But the practical result is that we will see that not every team ends up implementing the specification.

In the construction of real-time data warehouse, we should pay special attention to the standardization of the model, because the implementation of data warehouse has a characteristic that the real-time operation itself is a 7×24 hour scheduling state, so when modifying a field, the operation and maintenance cost may be very high. In an offline warehouse, a table may be changed, and as long as the downstream operations are changed within a day, nothing will happen. But the real-time data warehouse is different, as long as the upstream table structure is changed, the downstream operation must be able to correctly parse the upstream data.

In addition to using a system like Kafka, which itself is not structured storage, there is no concept of metadata, and it is not possible to change table names, table types, like the previous specification specification. It can be costly to regulate after the fact. Therefore, it is suggested that the standardization of these models should be implemented as soon as possible at the beginning of construction to avoid the subsequent investment in very large costs of governance.

  • Repetitive data processing

We add additional information to each piece of data in addition to the data itself, addressing some common problems in real-time data production

6 Key value generation

  • Unique keys and primary keys

We’re going to add a unique key and a primary key to each piece of data, and these two are paired, and the unique key is the one that identifies a unique piece of data, and the primary key is the one that identifies a row of data. A row of data may change many times, but the primary key is the same, and each change is a unique change of its own, so there will be a unique key. Unique keys mainly solve the problem of data duplication. In terms of layers, data is produced outside our warehouse, so it is difficult to guarantee that data outside our warehouse will not be duplicated.

There may be some people who deliver data to people who also inform them that data may be duplicated. Generating unique keys means that we need to ensure that data at the DW layer has an identity to address the problem of duplicate computations that may result from duplicate data generated upstream. Create a primary key, in fact, the primary key partition operation in Kafka, and before the ODS to ensure that the order of the partition principle is the same, through the primary key, in Kafka after the partition, the consumption of data can ensure that a single data consumption is in order.

  • Versions and batches

Version and batch are actually another group. Of course, the content can be named anything, but the most important thing is its logic.

First, version. The concept of a version is the corresponding table structure, that is, the schema of a version of data. Since the downstream scripts rely on the previous schema of the table when processing real-time data, they are developed. When the structure of the data table changes, there are two possible situations: first, new fields may be added or deleted without use, in fact, no perception, no operation can be done. In the other case, changing fields are required. There is a problem with Kafka tables, which are equivalent to having two different table structures. We need a markup version to tell us what the table structure should be for consuming this data, so we need a concept like version.

Second, batch. Batch is actually a more unusual scenario, and sometimes data redirects may occur, which is not quite the same as a restart, where the job might be to make a change and then start at the location of the last consumption. And reroute, the location of data consumption will change.

For example, today’s data calculation is wrong, the leader is anxious to ask me to correct, and then I need to recalculate today’s data, maybe after modifying the data program, I also need to set the program, for example, from this morning to run again. At this time, because the entire data program is online for 7×24 hours, in fact, the original data program cannot be stopped. After the reguided program catches up with the new data, the original program can be stopped. Finally, the reguided data can be used to update the data of the result layer.

In this case, two sets of data must exist for a short time. When you want to distinguish between these two sets of data, you need to distinguish by batch. In fact, all the jobs only consume the data of the specified batch. When the rerunning job occurs, only the jobs consuming the rerunning batch will consume the rerunning data. Then, after the data catches up, all the jobs of the original batch can be stopped, which can solve a data rerunning problem.

■ Dimensional data construction

The second is dimension data, which is included in our detail layer. The processing of dimension data is actually divided into two categories of dimension data and processed by different schemes.

  • A dimension with a low frequency of change

The first kind of data is some data with low change frequency, which may be some basically unchanged data. For example, some geographical dimension information, holiday information and some fixed code conversion.

Slow dimension

These data can be accessed directly through the offline warehouse where the corresponding dimension table is stored, and then through a synchronous job to load it into the cache. Other dimension data is created very quickly, and new data may be created all the time, but once created, it doesn’t really change.

For example, when a new store is opened on Meituan, the name of the city where the store is located and other fixed attributes may not change for a long time. Just take the latest data. In this case, we access the current data directly through some public service within the company. Finally, we will cover the concept of a dimension service to shield users from the specific query details, through which the specific dimension information can be associated.

  • A dimension with a high frequency of change

The second category is data that varies more frequently. For example, changes in the state of the patient’s heart and brain department, or the price of a commodity. These things tend to change more frequently and more quickly over time. For this kind of data, our solution is a little bit more complicated. First of all, it monitors the change of dimensional data such as price, which changes frequently. For example, thinking of prices as dimensions, we would listen for information about price changes in the dimensions and then build a zipper table of price changes.

Rapidly changing dimensions

Once the dimension zipper table is established, when a piece of data comes, the exact dimension corresponding to this data at a certain time can be known, avoiding the problem of wrong dimension association due to the rapid change of dimension.

Another dimension, such as new and old customers, is actually a derivative dimension for us, because it is not the calculation method of dimension itself, but is calculated by whether the user has placed an order, so it is actually a dimension calculated by order data.

Therefore, for dimensions like order number, we will build some computational models of derivative dimensions in the DW layer, and then the output of these computational models is actually a zipper table, which records the change degree of new and old customers of a user every day, or the change process of a high-quality user. Since the establishment of the zipper table itself also needs to be associated with dimensions, it can be guaranteed not to be out of order by grouping keys before, so it is still associated as a constant dimension.

Setting up a zipper table this way is relatively cumbersome, so it is actually recommended to leverage some of the functionality of external components. In actual operations, Hbase is used. HBase supports multiple versions of data and records the time stamps of data updates. When retrieving data, the time stamps can even be used as indexes.

Therefore, you can save data to HBase and run the Mini-versions command to ensure that data will not die due to timeout. As mentioned above, there is a general principle for the whole real-time data warehouse, which does not deal with offline data warehouse processing. This process is similar to processing data within three days. Therefore, you can configure the TTL to ensure that these dimensions in HBase are eliminated as soon as possible. Since dimensions that were many days ago are actually no longer associated, this ensures that dimensional data does not grow indefinitely, causing storage explosions.

■ Dimension data usage

After processing the dimension data, how is the dimension data used?

Dimensions using

The first and simplest solution is to use UDTF associations. In fact, a UDTF is written to query the dimension services mentioned above. Specifically, keywords LATERAL TABLE are used for correlation, and both internal and external associations are supported.

Another solution is to analyze SQL, identify the associated dimension table and fields in the dimension table, and convert its original query into the original table. Flatmap (dimension table), and finally convert the results of the whole operation into a new table to complete the association operation.

But this operation requires the user to have a lot of peripheral systems to cooperate, first need to be able to parse SQL, but also to recognize text, remember all dimensional table information, and finally can perform SQL transformation, so this program is suitable for some mature SQL development framework based on Flink SQL system to use. If you are simply writing wrapped code, it is recommended that you use udTF-style association to be very simple and the effect is the same.

■ Summary layer construction

When building a summary layer for a real-time warehouse, there are many similarities with the offline solution.

Data warehouse summary layer

The first point is that for the processing of some common indicators, such as PV, UV and trading volume, we will carry out unified calculation in the summary layer. In addition, multiple calculations in each script will not only waste calculation power, but also may make calculation errors. You need to ensure that the caliber of indicators is unified in a fixed model. Flink SQL already supports many calculation methods, including count distinct and so on.

It is worth noting that when count distinct is used, all data to be removed will be stored in one state by default. Therefore, when the number of data to be removed is large, it may eat up a lot of memory and cause the program to crash. At this time, some algorithms of inexact systems can be considered, such as BloomFilter inexact deduplication and HyperLogLog ultra-low memory deduplication schemes, which can greatly reduce the memory usage.

The second point is a characteristic of Flink, that is, Flink has a lot of built-in time Windows. Flink SQL has rollover Windows, sliding Windows, and session Windows that are difficult to write offline, so you can develop more focused models and even use smaller time Windows that are less commonly used in offline development.

For example, calculating the last 10 minutes of data, such a window can help us build applications based on time trend charts. However, it should be noted that once the time window is used, the TTL parameter should be configured to reduce memory usage and improve the running efficiency of the program. In addition, if the TTL does not satisfy the window, it may lead to data calculation errors.

Third, the theme of the multidimensional pool in the summary, because real-time warehouse itself is subject-oriented, each topic may be concerned about the dimension is different, so we will be under the different theme, according to the subject concerned with dimension to collect some data, finally to said before the summary indicators. Retract stream is the default Kafka sink. It only supports append mode, so you need to perform a conversion.

If you want to write this data to Kafka, you need to do a conversion. The general conversion scheme is actually to remove the false process from the stream, save the true process, convert it to an Append stream, and then write to Kafka.

Fourthly, an important work will be done in the summary layer, which is the processing of derivative dimensions. If HBase storage can be used for processing derived dimensions, the HBase version mechanism can help you build a zipper table of derived dimensions more easily, which can help you get the exact dimension of real-time data.

Warehouse quality assurance

Through the above link, if you have already established a warehouse, you will find that to guarantee the normal running of the warehouse or run is to guarantee its high quality, is actually a very trouble process, it is much more complicated than a line of operations, so we in after the construction of the warehouse, need to build a lot of peripheral system to improve the efficiency of our production.

Here are some tool chain systems we use now. The functional structure of the tool chain system is shown in the figure below.

Real-time data warehouse tool diagram

First of all, the toolchain system includes a real-time computing platform, whose main function is to uniformly submit jobs, allocate some resources and monitor alarms. However, in fact, no matter whether the data warehouse is developed, such a tool is probably needed, which is the basic tool for developing Flink.

For us, there are two main tools associated with data warehousing:

  • System management module, this module is actually our real-time and offline are used together. The knowledge base management module is mainly used to record some information of tables and fields in the model, and some work order solutions will also be maintained in it. Flink management is mainly used to manage some Flink related system components developed by our company.
  • The key is actually a development tool that we use to develop real-time warehouse ETL. The main points are as follows:
  • SQL and UDF management, manage SQL scripts and UDFS, and configure UDFS.
  • View and monitor task logs.
  • Scheduling management mainly manages the redirecting and retransmitting of tasks.
  • Data asset management, managing real-time and offline metadata, and task-dependent information.

In fact, throughout the tool chain, each tool has its own specific usage scenarios, two of which are highlighted below.

Metadata and lineage management

■ Metadata management

During the development of Flink SQL, the metadata was rewritten for each task. Since Kafka and many caching components, such as Tair and Redis, do not support metadata management, it is important to build a metadata management system as soon as possible.

■ Blood management

In fact, blood is very important for real-time data warehouse. As mentioned above, in the operation and maintenance process of real-time operations, once you modify your own operations, you must ensure that the downstream can accurately parse the new data. If you rely on this kind of human brain to remember, for example, who uses my sales form or oral notification, the efficiency will be very low, so we must establish a set of blood management mechanism. To know who in the end is used in the production of the table, and then upstream with who, convenient for you to modify the time to be known, to ensure the stability of our whole real-time number warehouse.

Metadata management system

Metadata and blood management system, the simplest way to achieve the following three aspects:

  • Generate the Catalog from the metadata service

Firstly, through the metadata system, the metadata information in the metadata system is loaded into the program, and then the Flink Catalog is generated. This lets you know which tables can be consumed and used by the current job.

  • Parse DDL statements to create update tables

When a job runs a series of operations that eventually output a table, the DDL code for the output part of the job is parsed to create new metadata information that is written to the metadata system.

  • Job information and health status are written to metadata

Metadata information about the job itself and its running status is synchronized to the metadata system to help us establish kinship.

The resulting system can store this information in a database, or in files if your system is less complex. The point is to establish such a system as soon as possible, otherwise it will be very painful in the subsequent development and operation and maintenance process.

Data quality verification

Write real-time data to Hive and use offline data to continuously verify the accuracy of real-time data.

When the construction of a data warehouse, especially after the first time to establish, will be very suspicious of their data is accurate. The way to verify this before was to write a program to go into the warehouse and see if the data was correct. In the subsequent construction process, we found that it was too tiring to compare artificially every day.

We adopted a solution, write the tables in the middle layer to Hive, then use offline data rich quality verification tools to compare the data differences between offline and real-time same model, and finally monitor the alarm according to the set threshold. Although this solution can’t find real-time data problems in time, it can help you know how accurate the real-time model is before you go live. Then the task is reformed to continuously improve the accuracy of data. The solution also checks the accuracy of offline data.

The above is meituan-Dianping based on Flink real-time data warehouse application experience sharing, I hope to help you!