First, real-time data warehouse construction background

1. The need for real-time is growing

At present, the product demand and internal decision-making of major companies are more and more urgent for real-time data, which requires the ability of real-time data warehouse to enable. The data timeliness of traditional offline data warehouse is T+1, and the scheduling frequency is in days, which cannot support the data requirements of real-time scenarios. Even if the scheduling frequency can be set to hours, it can only solve part of scenes with low timeliness requirements, and scenes with high timeliness requirements still cannot be gracefully supported. So the problem of using data in real time must be addressed effectively.

2. Real-time technology is maturing

Real-time computing frameworks have gone through three generations of development: Storm, SparkStreaming and Flink, and computing frameworks are becoming more and more mature. On the one hand, the development of real-time tasks can be completed by writing SQL, which can inherit the architecture design idea of offline data warehouse well in the technical level. On the other hand, the capabilities provided by online data development platforms to support real-time task development, debugging, operation and maintenance are becoming more mature, and the development cost is gradually reduced, which helps to do this.

Second, real-time data warehouse construction purpose

1. Solve the problem of traditional counting warehouse

From the current situation of data warehouse construction, real-time data warehouse is a confusing concept. According to traditional experience analysis, data warehouse has an important function, that is, it can record history. Typically, a warehouse is expected to have data from the first day of operation, and then to the present day. But real time stream processing technology, it is the emphasis on the current state of a processing technology, combined with the current line of giant construction experience and the experience in the field of construction situation, we try to build the real-time number of positions in the company’s purpose for positioning, with several warehouse construction theory and technology, real-time solution because of the low current offline for warehouse data timeliness cannot solve the problem.

The main reasons for building real-time data warehouse at this stage are as follows:

  • The business of the company is more and more urgent for real-time data, which needs real-time data to assist decision-making.
  • Real-time data construction is not standardized, data availability is poor, can not form a warehouse system, a large amount of resources waste;
  • Data platform tools are becoming more mature to support overall real-time development, reducing development costs.

2. Application scenarios of real-time data warehouse

  • Real-time OLAP analysis;
  • Real-time data Kanban;
  • Real-time service monitoring;
  • Real-time data interface services.

Iii. Real-time data warehouse construction scheme

Next, we analyze the current real-time warehouse construction of several good cases, I hope these cases can bring you some inspiration.

1. Didi Hitch real-time data warehouse case

Drops data on the number of real-time warehouse team construction, basic meet the lift business in real-time side of all kinds of business requirements, preliminary set up lift real-time warehouse, completed the overall data layering, contains detailed data and summary data, unified the DWD layer, reduced the big data resource consumption, reusability, improves the data can be exported rich data services.

The specific structure of the warehouse is shown in the figure below:

From the data architecture diagram, there are many similarities between the free ride real-time data warehouse and the corresponding offline data warehouse. For example, hierarchical structure; For example, the ODS layer, the detail layer, the summary layer, and even the application layer may all name the same schema. However, a careful comparison reveals that there are many differences between the two:

  1. Compared with offline warehouse, real-time warehouse has fewer layers:
  • Offline for several positions from the current construction experience, several data warehouse detail content is very rich, with detail data generally will also contain the concept of light aggregate layer, application layer data in several positions in offline for several other storehouse, but live in several storehouse, the app application layer data already fell into the application system of the storage medium, can put the number of layers and warehouse table separation;
  • Advantages of less application layer construction: When processing data in real time, data will inevitably be delayed at each layer.
  • Advantages of less summary layer construction: In summary statistics, in order to tolerate some data delay, some artificial delay may be created to ensure the accuracy of data. For example, when collecting the data of a trans-sky order event, it may wait until 00:00:05 or 00:00:10 to ensure that all the data before 00:00 has been received. Therefore, too many levels of aggregation layer can increase the artificial delay of data.
  1. Compared with offline data warehouse, data source storage of real-time data warehouse is different:
  • In the construction of offline data warehouse, the entire offline data warehouse in Didi is currently built on Hive tables. However, in the construction of real-time data warehouse, the same table will be stored in different ways. For example, detailed data or summary data are stored in Kafka, but dimension information such as cities and channels needs to be stored in Hbase, mysql, or other KV databases.

Next, according to the hitch real-time data warehouse architecture diagram, the construction of each layer is carried out in detail:


1. ODS source layer construction

According to the specific scenarios of hitch, current hitch data sources mainly include binlog logs related to orders, bubbling and security-related public logs, and flow-related buried point logs, etc. Some of these data have been collected and written into kafka or DDMQ data channels, and some of the data need to be collected with the help of internal self-developed synchronization tools, and finally written into Kafka storage media based on the oDS layer construction specifications of the tailwind warehouse.

Naming conventions: There are two main types of ODS layer real-time data sources.

  • One is DDMQ or Kafka Topic that has been automatically produced during offline collection. This type of data is named as cn-binlog- database name – database name eg:cn-binlog-ihap_fangyuan-ihap_fangyuan
  • The ODS layer uses the following methods:Realtime_ods_binlog_ {source system library/table name}/ods_log_{log name} eg: realtime_ODs_binlog_iHAP_fangyuan

2. DWD detail layer construction

Based on the characteristics of each specific business process, the most fine-grained detail layer fact table is constructed according to the modeling driven business process of hitch. In combination with the offline data usage characteristics of hitch car analysts, some important dimension attribute fields of the detailed fact table are appropriately redundant, and the wide table processing is completed. Then, based on the current demand for real-time data of hitch car business parties, several modules such as transaction, finance, experience, security and flow are mainly constructed. Data in this layer comes from ODS layer, and ETL work is completed through Stream SQL provided by big data architecture. The processing of binlog logs is mainly for simple data cleaning, processing of data drift and data out of order, and possibly Stream Join to multiple ODS tables. For traffic logs, it mainly does general ETL processing and data filtering for hitch ride scenarios, and completes structured processing of unstructured data and data diversion. In addition to storing data in the message queue Kafka, this layer is typically written to the Druid database in real time for querying detailed data and serving as a processing source for simple summary data.

Table names at the DWD layer are in lowercase letters and separated by underscores (_). The total length cannot exceed 40 characters and must comply with the following rules: Realtime_DWD_ {business /pub}_{Data domain abbreviation}_[{business process abbreviation}]_[{custom table naming label abbreviation}]

  • {service /pub} : refer to the service name
  • {Data domain abbreviation} : Refer to the data domain partition section
  • {custom table naming label abbreviation} : Entity name can be based on data warehouse transformation integration to do some business abstract name, the name should accurately express the business meaning of the entity
  • Sample: realtime_dwd_trip_trd_order_base

3. The DIM layer

  • The common dimension layer, based on the idea of dimension modeling, establishes the consistency dimension of the whole business process and reduces the risk of data calculation caliber and algorithm inconsistency;
  • The data of DIM layer come from two parts: one is obtained by real-time processing of ODS layer data by Flink program; the other is obtained by off-line tasks.
  • Three storage engines are mainly used for DIM dimension data: MySQL, Hbase and Fusion (stored by didi’s own KV). MySQL can be used when dimension table data is small; Fusion can be used when single data is small and QPS query is high. Reduce the usage of machine memory resources. HBase can be used to store data in scenarios that have a large amount of data and are not sensitive to changes in dimension table data.

Naming conventions: The names of tables at the DIM layer are lowercase letters separated by underscores (_). The total length of the tables cannot exceed 30 characters and must comply with the following rules: DIM_ {business /pub}_{dimension definition}[_{custom named tag}] :

  • {service /pub} : refer to the service name
  • {dimension definition} : Refer to dimension naming
  • {custom table naming label abbreviation} : Entity name can be based on data warehouse transformation integration to do some business abstract name, the name should accurately express the business meaning of the entity
  • Sample: dim_trip_dri_base

4. Construction of DWM summary layer

In the construction of the summary layer of the real-time data warehouse of hitch, there are many similarities with the offline data warehouse of hitch, but the specific technical implementation will be very different.

First: for the processing of some common indicators, such as PV, UV and order business process indicators, we will carry out unified calculation in the summary layer to ensure that the calibre of indicators is uniformly completed in a fixed model. For some individual indicators, a unique time field should be determined from the perspective of index reusability, and the field should be aligned with other indicators in the time dimension as far as possible. For example, abnormal order number in a row needs to be aligned with the index in the trading field in the event time.

The second: in the construction of the lift summary layer, the need for the theme of the multidimensional summary, because the number of real-time warehouse itself is subject-oriented, each topic may concern dimension is different, so you need to under the different theme, according to the subject concerned with dimension data summary, finally to summary indicator of business need. In the specific operation, for PV indicators, Stream SQL is used to realize the 1-minute summary indicator as the minimum summary unit indicator, and on this basis, indicators are accumulated in the time dimension. For UV indicators, druID database is directly used as the indicator summary container, according to the timeliness and accuracy requirements of the business side of the summary indicators, to achieve the corresponding precise and inaccurate de-weighting.

Third: the process of summary layer construction will also involve the processing of derivative dimensions. In the summary index processing related to ride coupons, we used the Hbase version mechanism to construct a zipper table of derived dimensions, and obtained the accurate dimension of real-time data at that time by associating event flow with Hbase dimension table

Naming conventions: DWM table names should be in lowercase letters, separated by underscores, with a maximum length of 40 characters, and should follow the following rules: Realtime_dwm_ {Business/PUB}_{Data domain abbreviation}_{Data primary granularity abbreviation}_[{custom table naming label abbreviation}]_{Statistical time period range abbreviation} :

  • {service /pub} : refer to the service name
  • {Data domain abbreviation} : Refer to the data domain partition section
  • {abbreviation for Data Primary granularity} : Abbreviation for data primary granularity or data domain, also the primary dimension in the federated primary key
  • {custom table naming label abbreviation} : Entity name can be based on data warehouse transformation integration to do some business abstract name, the name should accurately express the business meaning of the entity
  • {statistical period range abbreviation} : 1D: day increment; Td: day total (full amount); 1H: hourly increment; Th: hours accumulated (full); 1min: minute increment. Tmin: Accumulated minutes (full volume)
  • Sample:realtime_dwm_trip_trd_pas_bus_accum_1min

  1. The APP application layer

The main work of this layer is to write real-time summary data into the database of application system, including Druid database for large-screen display and real-time OLAP (this database can write application data and detailed data to complete summary index calculation), Hbase database for real-time data interface service. Mysql or Redis database for real-time data products.

Naming specification: based on the particularity of real-time data warehouse do not make hard requirements.

2. Scenarios of Kuaishou real-time data warehouse

1) Objectives and difficulties

  1. The target

First of all, because it is to do data positions, it is expected that all real-time indicators have offline indicators to correspond, requiring the overall data difference between real-time indicators and offline indicators within 1%, which is the minimum standard.

The second is the data delay. The SLA standard is that the data delay of all core report scenarios should not exceed 5 minutes during the activity. This 5 minutes includes the time after the job hangs and the recovery time.

Finally, stability. For some scenarios, such as the restart of the job, our curve is normal, and the restart of the job will not lead to some obvious abnormalities in the index output.

  1. The difficulties in

The first difficulty is the large amount of data. The total daily inbound traffic data is on the order of trillions. In the scenario of events such as the Spring Festival Gala, the peak QPS can reach 100 million/second.

The second difficulty is the complexity of component dependencies. Maybe some of the links depend on Kafka, some on Flink, some on KV storage, RPC interface, OLAP engine, etc. We need to think about how to distribute the links so that all of these components can work properly.

The third difficulty is the complexity of the link. Currently we have 200+ core business jobs, 50+ core data sources, and more than 1000 jobs overall.

2) Real-time data storehouse – layered model

Based on the above three difficulties, let’s look at the data warehouse architecture:

As shown above:

The lowest level has three different data sources, namely client logs, server logs, and Binlog logs. The public base layer is divided into two different levels, one is THE DWD layer, which does detailed data, and the other is the DWS layer, which does public aggregated data. DIM is the dimension we often say. We have a topic pre-layering based on offline storehouse, which may include traffic, users, devices, production and consumption of video, risk control, social, etc. The core work of the DWD layer is standardized cleaning; The DWS layer correlates dimension data to the DWD layer, which generates aggregation hierarchies of some general granularity. Next is the application layer, which includes some large-scale data, multi-dimensional analysis model and business thematic data. The scene is at the top. The whole process can be divided into three steps:

The first step is to do business data, equivalent to the business data in; The second step is data capitalization, which means to do a lot of cleaning of data, and then form some regular and orderly data. The third step is the businessization of data. It can be understood that data can feed back business at the level of real-time data and provide some empowerment for the value construction of business data.

3) Real-time data warehouse – safeguard measures

Based on the layered model above, take a look at the overall safeguard measures:

The guarantee level is divided into three different parts, namely, quality guarantee, time-effectiveness guarantee and stability guarantee.

Let’s look at the quality assurance in blue first. In terms of quality assurance, it can be seen that in the data source stage, we conducted out-of-order monitoring of data sources, which was based on the collection of our SDK, as well as the consistency calibration of data sources and offline. The calculation process in the research and development stage has three stages, namely, the research and development stage, the online stage and the service stage. In the development stage, a standardized model may be provided, based on which there will be some benchmarks and off-line comparison and verification to ensure consistent quality. The on-line stage is more about service monitoring and index monitoring. In the service stage, if there are some abnormal conditions, we will first pull up Flink state; if there are some scenes that do not meet the expectations, we will do offline overall data repair.

The second is timeliness. For data sources, we also monitor the latency of data sources. In the development phase, there are actually two things: first, pressure measurement. A conventional task will take the peak flow of the last 7 days or 14 days to see if it has task delay; After passing the pressure test, there will be some task on-line and restart performance evaluation, which is equivalent to what the restart performance will look like after CP recovery.

The last one is stability assurance, which can be done a lot in large events, such as switching drills and graded assurance. We will limit the current based on the previous pressure test results, in order to ensure that the operation is still stable when the limit is exceeded and there will not be many instability or CP failure. Then we will have two different standards, one is cold standby double room, the other is hot standby double room. Cold standby double machine room is: when a single machine room fails, we will pull up from another machine room; Hot backup dual equipment rooms: Deploy the same logic in both equipment rooms once. So that’s our overall safeguards.

3) Quick hand scene problems and solutions

1. PV/UV standardization

1.1 scenario

The first issue is PV/UV standardization. Here are three screenshots:

The first picture is the warm-up scene of the Spring Festival Gala, which is a kind of play. The second and third pictures are the screenshots of the activities of giving red envelopes and the live broadcast room on the day of the Spring Festival Gala.

In the course of the activity, we found that 60-70% of the requirements were for computing the information in the page, such as:

  • How many people come to the page, or how many people click through to the page;
  • How many people came to the event;
  • How many clicks a widget on a page gets and how many exposures it generates.

1.2 plan

To abstract the scenario, this is the following SQL:

In simple terms, you filter from a table, aggregate by dimension level, and then generate some Count or Sum operation.

Based on this scenario, our initial solution is shown on the right side of the figure above.

We used the Early Fire mechanism of Flink SQL to fetch data from the Source data Source and then DID buckets of DID. For example, at the beginning, the purple part was divided into buckets according to this method, and buckets were divided first to prevent hot spots in a CERTAIN DID. After buckets are divided, there is something called Local Window Agg, which is equivalent to adding up data of the same type after buckets are divided. Local Window Agg is followed by the combined bucket of the Global Window Agg based on the dimensions. The concept of combined bucket is equivalent to calculating the final result based on the dimensions. The Early Fire mechanic is equivalent to opening a sky-high Window in Local Window Agg and exporting it every minute.

We ran into some problems along the way, as shown in the lower left corner of the figure above.

There is no problem under the normal operation of the code, but if the overall data is delayed or historical data is traced, for example, Early Fire once a minute, the amount of data will be large during the historical tracing, so it may lead to the history tracing at 14:00 and directly reading the data at 14:02. The point at 14:01 is lost. What happens when you lose it?

In this scenario, the curve at the top is the result of Early Fire’s historical backtracking. The x-coordinate is minutes, and the y-coordinate is the page UV as of the current time, and we see that some points are horizontal, meaning there’s no data, and then there’s a sharp increase, and then there’s a sharp increase, and then there’s a sharp increase, and the expected result of this curve is actually the smooth curve at the bottom of the graph.

To solve this problem, we used the Cumulate Window solution, which is also covered in Flink 1.13, and it works the same way.

Data opens a large day level window, and a small minute level window opens under the large window. Data falls to the minute level window according to the Row Time of data itself.

Watermark pushes event_time of the window, and it triggers a delivery. In this way, the backtracking problem can be solved by the data itself falling into the real window, and Watermark pushes and triggers after the window ends. In addition, this method can solve the disorder problem to a certain extent. For example, its out-of-order data itself is a state that is not discarded, and the latest accumulated data will be recorded. Finally, semantic consistency, which will be based on the event time, is quite high consistency with the results calculated offline in the case of not serious disorder. The above is a standardized solution for PV/UV.

2. DAU calculation

2.1 Background

Here’s how to calculate DAU:

We have a lot of monitoring of active equipment, new equipment and backflow equipment across the whole market.

Active equipment refers to the equipment that came that day; New equipment refers to the equipment that has come on the day and has not come in history; Backflow equipment refers to equipment that has been in the same day and not in N days. However, we may need 5~8 different topics to calculate these indicators in the calculation process.

So let’s look at what the logic should be for an offline process.

First we count the active devices, combine them together, and then do a day-level de-weighting for a dimension. Then we associate the dimension table, which includes the first and last times of the device, i.e. the time of the first and last access to the device until yesterday.

Once we have this information, we can perform a logical calculation and discover that the new and refluxed devices are actually subtags of the active devices. The newly added equipment has done a logical processing, and the reflux equipment has done a logical processing for 30 days. Based on such a solution, can we simply write a SQL to solve this problem?

That’s actually what we did at first, but we ran into some problems:

The first problem is: there are 6 to 8 data sources, and the caliber of our large market is often fine-tuned. If it is a single operation, each fine-tuning process should be changed, and the stability of single operation will be very poor. The second problem is: the data volume is trillions, which will lead to two situations. First, the stability of a single job of this magnitude is very poor. Secondly, the KV storage is used when the dimension table is associated in real-time. The third problem is that we have high requirements for delay, which is less than one minute. Batch processing is avoided throughout the link, and if there is a single point of task performance problem, we also need to ensure high performance and scalability.

2.2 Technical Solutions

In view of the above problems, let’s introduce how we did:

As shown in the example in the figure above, the first step is minut-level de-duplication of data sources A, B and C according to dimension and DID. After de-duplication, three data sources with minut-level de-duplication are obtained, and then they are Union together, and then the same logical operation is performed.

This is equivalent to changing the entry of our data source from trillions to billions. After minutes-level de-weighting, a day-level de-weighting is carried out, and the data source generated can be changed from billions to billions.

In the case of billions of levels of data, we disassociate the data servitization, which is a relatively feasible state, equivalent to disassociating the RPC interface of the user portrait, and finally writing the RPC interface to the target Topic. This target Topic will be imported into the OLAP engine to provide a number of different services, including mobile services, large screen services, metrics kanban services, etc.

This scheme has three advantages, namely stability, timeliness and accuracy.

The first is stability. Loose coupling can simply be understood as changing the logic of data source A and data source B separately when they need to be changed. The second is task capacity expansion, because we split all the logic into very fine-grained, when some places, such as traffic problems, will not affect the later part, so it is relatively simple to expand, in addition to the servitization post and state control. The second is timeliness. We have millisecond delay and rich dimensions. On the whole, there are 20+ dimensions for multi-dimensional aggregation. Finally, accuracy. We support data verification, real-time monitoring, model exit unification, etc. At this point we have another problem – out of order. For the three different jobs above, there will be a delay of at least two minutes for each job to restart, which will cause the downstream data source Union to be out of order.

2.3 Delay calculation scheme

What do we do when we have a disorder like this?

We have three solutions in total:

The first solution is to use “did + dimension + minute” for deduplication, and Value is set to “have you been here?”. For example, the same DID, 04:01 came, it will output the result. Similarly, 04:02 and 04:04 will also be output. But if it comes again at 04:01, it will discard it, but if it comes again at 04:00, it will still output the result.

There were some problems with this solution, because we saved by minute, and the state size for 20 minutes was twice as big as the state size for 10 minutes, which became a little out of control, so we switched to solution 2.

In the second solution, our approach involves the assumption that there is no data source out of order. In this case, the key stores “did + dimension” and the Value is “timestamp”, which is updated as shown in the figure above. A piece of data came in. Output the result. A piece of data comes in, if it is the same DID, then it will update the timestamp and still do the result output. The same logic applies to 04:04, then it updates the timestamp to 04:04, and if it finds that the timestamp has been updated to 04:04, it discards the timestamp. This approach greatly reduces some states required by itself, but there is zero tolerance for disorder and no disorder is allowed. Since we cannot solve this problem, we have come up with solution 3.

Scheme 3 is based on scheme 2 timestamp, add a similar ring buffer, allow out of order in the buffer.

For example, at 04:01, a piece of data is sent to output the result; A datum arrives at 04:02. It updates the timestamp to 04:02 and records that the same device was there at 04:01. If another piece of data comes at 04:04, it will make a displacement according to the corresponding time difference, and finally ensure that it can tolerate certain disorder through such logic.

Taken together, these three options:

In scheme 1, when 16 minutes of disorder is tolerated, the state size of a single job is about 480 GB. Although this situation ensured the accuracy, the recovery and stability of the operation were completely uncontrollable, so we still gave up this plan.

Scheme 2 has a state size of about 30G and tolerates 0 disorder, but the data is not accurate. Since we have very high requirements on accuracy, we give up this scheme.

Compared with scheme 1, the state of scheme 3 has changed but not increased much, and it can achieve the same effect as scheme 1 on the whole. Scheme 3 tolerates disorder for 16 minutes. If we update a job normally, 10 minutes is enough to restart. Therefore, scheme 3 is finally selected.

3. Operation scenario

3.1 Background

The operation scenario can be divided into four parts:

The first one is data large screen support, including the analysis of single broadcast room data and the analysis of the large market data, need to achieve minute delay, update requirements are relatively high;

The second is the support of live broadcast kanban. The data of live broadcast kanban will be analyzed in specific dimensions and supported by specific groups, which have high requirements for dimension richness.

The third is the data strategy list, which is mainly used to predict popular works and popular styles. The list requires hourly data with relatively low update requirements.

The fourth is the real-time indicator display at the C-end, with a large amount of queries but a fixed query mode.

Let’s examine some of the different scenarios that arise from these four different states.

The first three are basically the same, except in the query mode, some are specific business scenarios, some are general business scenarios.

For the third and fourth kinds, it has relatively low requirements for update, relatively high requirements for throughput, and the curve in the process does not require consistency. The fourth query mode is more about some queries of single entities, such as what indicators there will be to query the content, and it has high requirements for QPS.

3.2 Technical Solutions

For the above four different scenarios, how did we do it?

Taking a look at the basic detail layer (left), the data source has two links, one of which is the flow of consumption, such as live consumption information, and views/likes/comments. Go through a base wash, then do dimension management. The upstream dimension information comes from Kafka, which writes some dimensions of content into KV storage, including some dimensions of the user.

After these dimensions are associated, the DWD fact layer is finally written to Kafka. For performance improvements, we have implemented a level 2 cache.

In the upper part of the figure, we read the data of DWD layer and make basic summary. The core is window dimension aggregation to generate four kinds of data with different granularity, namely, market multidimensional summary topic, live broadcast multidimensional summary topic, author multidimensional summary topic and user multidimensional summary topic, all of which are data of general dimension.

In the lower part of the figure, based on these general dimension data, we processed the data of personalized dimension, namely ADS layer. After obtaining these data, dimension expansion will be carried out, including content expansion and operation dimension expansion, and then aggregation will be carried out, such as e-commerce real-time topic, institutional service real-time topic and big V live real-time topic.

Splitting into two such links has an advantage: one place handles the general dimension and the other handles the personalized dimension. General dimension guarantee requirements will be higher, personalized dimension will do a lot of personalized logic. If the two are coupled together, you will find that tasks often break down and it is not clear which tasks have what responsibilities to build such a stable layer.

On the right, we ended up with three different engines. In simple terms, Redis query is used in the C-terminal scenario, OLAP query is used in the large screen, business kanban scenario.

3. Tencent watch real-time number positions case

Why Tencent hotspot business to build real-time data warehouse, because the original amount of reported data is very large, a day to report the peak of trillions. And the reporting format is confusing. Lack of content dimension information, user portrait information, downstream can not be directly used. The real-time data warehouse provided by us is based on the business scenarios of Tencent’s watch information flow, and carries out the association of content dimensions, the association of user portraits and the aggregation of various granularity, so that the downstream can use real-time data very conveniently.

1) Scheme selection

Let’s take a look at the solution selection of our multidimensional real-time data analysis system. In the selection, we compared the leading solutions in the industry and selected the solution that best fits our business scenarios.

  • The first part is the selection of real-time data warehouse. We choose Lambda architecture which is relatively mature in the industry. Its advantages are high flexibility, high fault tolerance, high maturity and low migration cost. The disadvantage is that there are two sets of codes for real-time and offline data, so there may be a problem that one caliber is modified and the other is not. We do data reconciliation work every day, and we will send alarms if there is anything abnormal.

  • The second piece is real-time computing engine selection, since Flink was originally designed for streaming, SparkStreaming is strictly microbatch and Strom doesn’t use it much anymore. Flink is characterized by exact-once accuracy, lightweight Checkpoint fault tolerance, low latency, high throughput, and high ease of use, so we chose Flink as a real-time computing engine.

  • The third block is the real-time storage engine. Our requirements are to have dimensional indexes, support high concurrency, pre-aggregation, and high performance real-time multidimensional OLAP queries. As you can see, Hbase, Tdsql, and ES cannot meet the requirements. Druid has a drawback. It divides the Segment by time sequence, so it cannot store the same content in the same Segment. So we chose ClickHouse, the MPP database engine that has been popular for the last two years.

2) Design objectives and difficulties

Our multidimensional real-time data analysis system is divided into three modules

  1. Real-time computing engine
  2. Real-time storage engine
  3. The App layer

The difficulty lies in the first two modules: real-time computing engine and real-time storage engine.

  1. How to access massive data of tens of millions /s in real time, and carry out extremely low delay dimension table association.
  2. How real-time storage engines support high concurrent writes, high availability distributed, and high performance indexed queries is difficult.

The concrete realization of these several modules, look at the architecture design of our system.

3) Architectural design

The front end uses the open source component Ant Design, using the Nginx server, deploying static pages, and reverse proxy browser requests to the background server.

The background service is written based on the RPC background service framework developed by Tencent, and some secondary caching will be carried out.

The real-time data warehouse is divided into access layer, real-time computing layer and real-time data warehouse storage layer.

  • The access layer is mainly divided into microqueues of different behavior data from the original message queue of tens of millions per second. For example, after splitting, the data is only millions per second.

  • The real-time computing layer is mainly responsible for row transformation of multi-line behavioral data and real-time association of user portrait data and content dimension data.

  • Real-time data warehouse storage layer is mainly designed to meet the hotspot business, easy to use downstream real-time message queue. We provisionally provide two message queues as two layers of a real-time warehouse. The first DWM layer is content ID-user ID granularity aggregation, that is, a data contains content ID-user ID, B side content data, C side user data and user portrait data; The other layer is THE DWS layer, which is the granularity aggregation of content ID. A piece of data contains content ID, B-side data and C-side data. You can see that the content ID-user ID granularity of message queue traffic is further reduced to 100,000 levels /s, and the content ID granularity is 10,000 levels /s, with clearer format and richer dimension information.

Real-time storage is divided into real-time write layer, OLAP storage layer and background interface layer.

  • The real-time write layer is mainly responsible for writing data to Hash routes.
  • OLAP storage layer uses MPP storage engine to design index and materialized view in line with business and store massive data efficiently.
  • Background interface layer provides efficient multidimensional real-time query interface.

4) Real-time computing

The most complex pieces of the system, real-time computing and real-time storage.

Firstly, the real-time computing part is introduced: it is divided into real-time correlation and real-time data warehouse.

1. Real-time high-performance dimension table association

The difficulty of real-time dimension table association is that real-time data flows of millions of levels per second are directly associated with HBase. If HBase data is associated with one minute, it takes hours to complete HBase association, resulting in serious data delay.

We propose several solutions:

  • The first one is that in the real-time calculation of Flink, the window aggregation is carried out according to 1 minute, and the multi-row behavior data in the window is converted to the data format of one-row and multi-column. After this step, the association time of the original hour is reduced to more than ten minutes, but it is still not enough.

  • Second, a layer of Redis cache is configured before accessing HBase content. The speed of accessing 1000 pieces of data to HBase is 1000 times faster than that of accessing HBase. To prevent the cache from wasting expired data, the cache expiration time is set to 24 hours, and the HBase Proxy is monitored to ensure cache consistency. This reduces access times from tens of minutes to seconds.

  • Third, many unconventional content ids will be reported during the reporting process. These content ids are not stored in HBase, which may cause cache penetration problems. So in the real-time calculation, we filter out these content ids directly to prevent cache penetration and save some time.

  • The fourth is the introduction of a cache avalanche because of the timing of caching. In order to prevent avalanches, we carried out the operation of peak clipping and valley filling in real-time calculation and staggered the cache time.

It can be seen that before and after optimization, the amount of data has been reduced from ten billion to one billion, and the time consuming has been reduced from hours to tens of seconds, reducing by 99%.

2. Downstream services

The difficulty of real-time data warehouse lies in that it is in a relatively new field, and the gap between each company’s various businesses is relatively large, how to design a convenient, easy to use, in line with the hotspot business scene of real-time data warehouse is difficult.

Let’s take a look at what the real-time data warehouse does. The real-time data warehouse is composed of several message queues. Different message queues store real-time data of different aggregation granularity, including content ID, user ID, behavior data on THE C side, content dimension data on the B side and user portrait data.

How did we build the real-time data warehouse, which is the output of the real-time computing engine described above, stored in the message queue, which can be provided to the downstream multi-user reuse.

We can look at the difference between developing a real-time application before and after we build a real-time data warehouse. No number of warehouse, we need consumer must level/s original queue, complex data cleansing, then user portrait association, content dimension, associated format that accords with a requirement to get real-time data, the development and extension of cost is higher, if want to develop a new application, and to go over the process. After having data warehouse, if you want to develop real-time application of content ID granularity, you can directly apply for message queue of DWS layer of TPS 10,000 level /s. Development costs are much lower, resource consumption is much lower, and scalability is much higher.

For a practical example, developing a large screen of real-time data for our system would have required all of the above operations to get the data. Now all you need to do is consume the DWS layer message queue, write a Flink SQL, and consume only 2 CPU cores and 1 GB of memory.

It can be seen that with 50 consumers as an example, the downstream development of a real-time application before and after the establishment of real-time data warehouse can reduce resource consumption by 98%. Including computing resources, storage resources, labor costs and developer learning access costs and so on. And the more consumers there are, the more savings there are. Take Redis storage for example, you can save millions of yuan a month.

5) Real-time storage

After real-time computing, let’s move on to real-time storage.

This part is introduced in three parts

  • The first is distributed – highly available
  • The second is massive data – writing
  • The third is high performance – query

1. Distributed – Highly available

We are listening to Clickhouse’s official advice for a highly available solution with ZK. Data is written to a shard, only one copy, and then ZK is written. ZK tells other copies of the same shard, and the other copies come to pull data to ensure data consistency.

Message queues are not chosen for data synchronization because ZK is more lightweight. And when writing, any copy can be written, the other copies can get consistent data through ZK. In addition, even if other nodes fail to obtain data at the first time, they will try to obtain data again if they find it inconsistent with the data recorded on ZK to ensure consistency.

2. Massive data – Write

The first problem with data writing was that ZK’s QPS was too high if a lot of data was written directly to Clickhouse, so the solution was to Batch the data instead. How large is the Batch set? If the Batch is too small, the pressure of ZK cannot be alleviated, and the Batch should not be too large, otherwise the upstream memory pressure will be too great. Through experiments, we finally selected a Batch with a size of hundreds of thousands.

The second problem is that as the volume of data increases, a single QQ video can be written to tens of billions of data per day. The default solution is to write a distributed table, which can cause disk bottlenecks on a single machine, especially when Clickhouse uses Mergetree. The working principles are similar to lSM-tree of HBase and RocketsDB. In the process of merging, there will be a problem of write enlargement, which will aggravate the disk pressure. The peak value is tens of millions of data pieces per minute, which takes tens of seconds to complete. If Merge is being performed, write requests are blocked and query is very slow. We made two optimization schemes: one is to Raid the disk to improve the IO of the disk; The second is to divide the table before writing, directly write to different fragments, disk pressure directly changes to 1/N.

The third problem is that although we write partition according to sharding, a common problem in distributed system is introduced here, that is, local Top is not global Top. For example, the data with the same content ID falls on different shards, and the content ID read by the global Top100 is calculated. One content ID is Top100 in shard 1, but not Top100 in other shards. As a result, some data will be lost during the summary, affecting the final result. The optimization we made was to add a layer of routing before writing, routing all the records with the same content ID to the same shard, which solved this problem.

With writing covered, the next step is to introduce Clickhouse’s high-performance storage and queries.

3. High performance – Storage-query

A key point of Clickhouse’s high-performance queries is sparse indexing. Sparse index design is very careful, good design can speed up the query, poor design will affect the efficiency of the query. I base my queries on our business scenario, because most of our queries are time and content ID related, for example, how has a certain content performed in the last N minutes among different groups of people? I built sparse indexes by date, minute granularity time, and content ID. A sparse index for a query can reduce file scans by 99%.

Another problem is that we now have too much data and too many dimensions. Take QQ to watch video content for example, there are tens of billions of streams a day, some dimensions have hundreds of categories. If you pre-aggregate all dimensions at once, the data volume expands exponentially, the query becomes slower, and it takes up a lot of memory. Our optimization builds corresponding pre-aggregated materialized views for different dimensions and uses space for time, which can shorten the query time.

Another problem with distributed table queries is that if you query information about a single content ID, the distributed table sends the query to all shards and then returns the query results for summary. In fact, because of routing, a content ID exists on only one shard, and the rest of the shards are running empty. For this kind of query, our optimization is that the background according to the same rules of the first way, directly query the target fragment, so as to reduce the n-1 /N load, can greatly shorten the query time. And since we are providing OLAP queries, the data meets the final consistency, which can be further improved by separating read and write from primary and secondary copies.

We also do a 1-minute data cache in the background, for the same condition query, the background directly returned.

Expansion and 4.

Here we introduce our plan to expand the capacity, and investigate some common plans in the industry.

For example, in HBase, original data is stored in the HDFS. Capacity expansion is only for Region Server expansion and does not involve original data migration. However, Each shard in Clickhouse is local and is a low-level storage engine that cannot be expanded as easily as HBase.

Redis is a hash slot similar to consistent hash, is a more classic distributed cache scheme. Redis slot has temporary ask read unavailability during Rehash, but it is generally convenient to migrate from h[0] to H [1] and then delete h[0]. But Clickhouse is mostly OLAP batch queries, not point-and-click queries, and consistent hashing is not a good fit because of column storage, which does not support deletion.

The current expansion plan is to consume an additional piece of data, write it to a new Clickhouse cluster, and run the two clusters together for a period of time because the live data is kept for 3 days, after which the back-end service directly accesses the new cluster.

4. Good real-time data warehouse case

1) Layered design

We are all familiar with the hierarchical design of traditional off-line data warehouse. In order to organize and manage data in a standard way, there will be more hierarchical division. In some complex logic processing scenes, temporary layer-landing intermediate results will be introduced to facilitate downstream processing. Considering the timeliness of real-time data warehouse, the hierarchical design should be simplified as far as possible to reduce the possibility of errors in the middle process. However, in general, real-time data warehouse will be designed with reference to the hierarchical idea of offline data warehouse.

The hierarchical structure of real-time data warehouse is shown in the figure below:

– ODS (Real-time Data Access Layer)

ODS layer, namely real-time data access layer, collects real-time data of various business systems through data acquisition tools, conducts structured processing on unstructured data, saves original data, and almost does not filter data. The data of this layer are mainly from three parts: the first part is NSQ message created by the business side, the second part is Binlog log of the business database, and the third part is buried point log and application program log. The real-time data of the above three parts are finally written into Kafka storage media uniformly.

ODS layer table naming convention: department name. Application name. Storestore-level subject field prefix database name/message name

For example, access the Binlog of the service library

The real-time database table is named deptName.appName.ods_subjectName_tablename

For example, access NSQ messages of the service party

The real-time database table is named deptName.appName.ods_subjectName_MSgName

– DWS (Real-time detail middle Layer)

The DWS layer, the real-time detail intermediate layer, is model-driven by business process and builds the finest-grained detail layer fact table based on each specific business process event; For example, in the transaction process, there are order events, payment events, delivery events, etc., we will build the detail layer based on these independent events. At this layer, the fact detail data is also divided according to the subject field of the offline data warehouse, and the data is also organized by dimension modeling, with appropriate redundancy for some important dimension fields. Based on the scene of real-time demand for likes, the data of transaction, marketing, customers, stores, commodities and other subject fields are mainly constructed. The data of this layer comes from ODS layer, and ETL processing is carried out through FlinkSQL. The main work includes standard naming, data cleaning, dimension completion, multi-stream association, and finally unified writing into Kafka storage medium.

DWS layer table naming convention: department name. Store level _ Subject field prefix _ Store table name

For example, the middle layer of real-time event A

The real-time data store table is named deptname.appName.dWS_subjectName_TABlename_EVENTNamea

For example, the middle layer of real-time event B

The real-time database table is named deptName.appName.dWS_subjectName_TABlename_eventNameb

-dim (Real-time dimension)

The DIM layer, namely the real-time dimension layer, is used to store dimension data, which is mainly used to complete dimension in the middle layer width processing of real-time details. At present, data at this layer is mainly stored in HBase, and more suitable storage media will be provided based on QPS and data volume.

DIM layer table naming convention: Apply name _ store_level _ subject field prefix _ store_table naming

For example, HBase storage, real-time dimension table

The real-time data warehouse table is named appname_DIM_tablename

– DWA (Real-time summary layer)

DWA layer, namely real-time summary layer, carries out multidimensional summary through DWS layer data and provides it to the downstream business party for use. In practical application, different business parties have different ways to use dimensional summary and adopt different technical solutions to achieve it according to different requirements. The first way is to use FlinkSQL for real-time summary and store the result indexes in HBase, MySQL and other databases. This way is the solution we adopted earlier. Its advantage is that the business logic is flexible, but its disadvantage is that the aggregation granularity is fixed and not easy to expand. The second method, real-time OLAP tool is used for summarizing, which is commonly used by us at present. The advantage of this method is that the aggregation granularity is easy to expand, but the disadvantage is that the business logic needs to be preprocessed in the middle layer.

DWA layer table naming convention: Application name _ storehouse level _ Subject field prefix _ aggregation granularity _ data range

For example, HBase storage: real-time summary table of the current day in a domain of a specific granularity

The real-time warehouse table is named appname_dwa_subjectName_AGgname_daily

– APP (Real-time application layer)

The APP layer, the real-time application layer, has been written to the storage of the application system, such as Druid as BI kanban real-time data set; Write HBase and MySQL to provide a unified data service interface. Write ClickHouse is used to provide real-time OLAP services. Because the layer is very close to the business, the real-time number warehouse in the naming specification does not do unified requirements.

2) real-time ETL

There are many components involved in the ETL processing process of real-time data warehouse. Next, the components needed for the construction of real-time data warehouse and the application scenarios of each component are reviewed. As shown below:

The specific real-time ETL processing process is shown in the figure below:

1. Dimension completion

Creating UDF functions that call Duboo interface is the most convenient way to complete dimensions in real-time streams. However, if the request volume is too large, Duboo interface will be under too much pressure. In practical application scenarios, the first choice for completing the full dimension is the association dimension table, but the association also has a certain probability of loss. In order to compensate for such loss, Duboo interface can be used to call the bottom of the pocket to complete the dimension. The pseudocode is as follows:

create function call_dubbo as 'XXXXXXX';
create function get_json_object as 'XXXXXXX';

case
    when cast( b.column as bigint) is not null
        then cast( b.column as bigint)
            else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'
                                                              ,'serviceName'
                                                              ,'methodName'
                                                              ,cast(concat('['.cast(a.column as varchar),'] ') as varchar),'key'),'rootId')
                                         as bigint)
                                   ,a.column)
                            as bigint)  end
Copy the code

2. Idempotent processing

Real-time task will inevitably encounter in the process of running to the execution of abnormal situation when the task restart will cause abnormal part of the message to send and consumption, triggering downstream real-time statistics is not accurate, in order to effectively avoid this kind of circumstance, can choose to do idempotent with real-time message flow, when the consumption up, a message will be Key in the KV of this message, If the message is re-sent due to the abnormal restart of the task, the KV determines whether the message has been consumed. If it has been consumed, the message will not be sent down. The pseudocode is as follows:

create function idempotenc as 'XXXXXXX';

insert into table
select
    order_no
from
    (
        select
            a.orderNo                                        as  order_no
          , idempotenc('XXXXXXX'.coalesce( order_no, ' '))as  rid
        from
            table1
    ) t
where
    t.rid = 0;
Copy the code

3. Data verification

Since the data of real-time data warehouse is a borderless flow, it is more difficult to accept the fixed data of offline data warehouse. Based on different scenarios, we provide two verification methods: sampling verification and full verification. See Figure 3.3

  • Sampling verification scheme

The scheme is mainly used on data accuracy test, real-time summary results are based on stored in Kafka’s real-time detail middle-tier calculation, but Kafka itself does not support according to the specific conditions of retrieval, does not support writing a query statements, plus message without boundary, statistical result is changing, it is hard to find reference. In view of this, we adopt the method of persistent message, drop the message to TiDB storage, based on the ability of TiDB to retrieve, query and summarize the message dropped. Write test cases with fixed time boundaries and compare them with business library data or offline warehouse data with the same time boundaries. Through the above methods, data from core stores were sampled to verify the accuracy of indicators and ensure that all test cases passed.

  • Full validation scheme

This scheme is mainly applied to data integrity and consistency verification, and is most used in real-time dimension table verification scenarios. General idea: Synchronize the data in the online HBase cluster that stores the real-time dimension table to the offline HBase cluster and import the data in the offline HBase cluster to the Hive. After the time boundary of the real-time dimension table is defined, use the data verification function provided by the data platform to compare the difference between the real-time dimension table and the offline dimension table. Finally, ensure that the two tables have exactly the same data.

4. Restore data

Real-time tasks, once online, require continuous, accurate and consistent service. Unlike offline tasks that are scheduled by the day, if bugs occur in offline tasks, there is plenty of time to fix them. If bugs occur in real-time tasks, you must strictly follow the procedures specified in advance. Otherwise, problems may occur. There are many cases of bugs, such as code bugs, abnormal data bugs and real-time cluster bugs. The following figure shows the process of fixing real-time task bugs and recovering data.

5. Case of Tencent real-time warehouse construction in all scenarios

In the data warehouse system, there are various big data components, such as Hive/HBase/HDFS/S3, and computing engines such as MapReduce, Spark and Flink. According to different requirements, users will build big data storage and processing platforms, on which data will be processed and analyzed. The resulting data is stored in relational and non-relational databases that support fast queries, such as MySQL and Elasticsearch. The application layer can then use this data for BI report development, user portrait, or interactive query based on Presto OLAP tools.

1) Pain points of the Lambda architecture

During the whole process, we often used some offline scheduling system to perform Spark analysis tasks periodically (T+1 or every few hours), do some data input, output or ETL work. In the whole process of offline data processing, data delay is inevitable. Whether it is data access or intermediate analysis, the data delay is relatively large, which may be hour level or day level. In other scenarios, we often build a real-time processing process for some real-time requirements, such as using Flink+Kafka to build a real-time flow processing system.

On the whole, there are many components in the warehouse architecture, which greatly increases the complexity of the whole architecture and the cost of operation and maintenance.

The diagram below, this is a lot of companies before or now are adopting the Lambda architecture, Lambda architecture several positions can be divided into offline and real-time layer, the corresponding batch and flow processing two independent data processing procedure, and the same data will be processed two above, the same set of business logic code requires the development of fitment twice. Lambda architecture we should be very familiar with, I will focus on the use of Lambda architecture in the process of data warehouse construction encountered some pain points.

For example, in the real-time scenario of real-time calculation of some user-related indicators, when we want to see the current PV and UV, we will put these data into the real-time layer to do some calculations, and the values of these indicators will be displayed in real time. But at the same time, to understand a growing trend of users, we need to calculate the data of the past day. In this way, batch scheduling tasks are required. For example, a Spark scheduling task is launched on the scheduling system at two or three o ‘clock in the morning to run all the data of the day again.

Obviously, in this process, because the two processes run at different times, but run the same data, so it may cause inconsistent data. Due to the updating of one or several data pieces, it is necessary to run the whole offline analysis link again, and the data updating cost is very high. Meanwhile, it is necessary to maintain offline and real-time analysis two sets of computing platform, and the development process and operation and maintenance cost of the whole upper and lower layers are actually very high.

In order to solve the problems caused by the Lambda architecture, the Kappa architecture was born, which should be very familiar to all of you.

2) Pain points of Kappa architecture

Let’s take a look at the Kappa architecture, as shown below, which actually uses message queues in the middle and connects the whole link with Flink. Kappa architecture solves the problem of high operation and maintenance costs and development costs caused by different engines between offline processing layer and real-time processing layer in Lambda architecture, but Kappa architecture also has its pain points.

First of all, when constructing real-time business scenarios, Kappa will be used to build a near-real-time scenario. However, if you want to do some simple OLAP analysis or further data processing on the middle layer of warehouse such as ODS layer, such as writing data to Kafka of DWD layer, Flink will be needed. At the same time, the need to import data from Kafka at the DWD layer into Clickhouse, Elasticsearch, MySQL, or Hive for further analysis obviously adds complexity to the architecture.

Secondly, Kappa architecture is strongly dependent on message queue. As we know, the accuracy of data calculation of message queue itself in the whole link is strictly dependent on the order of its upstream data. The more messages queue is connected, the more possibility of out of order will occur. The ODS layer data is usually absolutely accurate. The ODS layer data may be out of order when sent to the next Kafka, and the DWD layer data may be out of order when sent to the DWS, which can cause serious data inconsistencies.

Third, Kafka because it is a sequential storage system, sequential storage system is not directly in its use of OLAP analysis of some optimization strategies, such as predicates push down this kind of optimization strategy, in the sequential storage of Kafka is more difficult to implement.

So is there an architecture that can meet the requirements of real-time and offline computing, reduce the cost of operation and maintenance development, and solve some pain points encountered in the process of constructing Kappa architecture through message queues? The answer is yes, and more on that later.

3) Summary of pain points

4) Flink+Iceberg builds real-time data warehouse

1. Near-real-time data access

As previously introduced, Iceberg supports both read and write separation, concurrent read, incremental read, small file merger, and second to minute delay. Based on these advantages, we try to use Iceberg functions to build a real-time data warehouse architecture based on Flink.

As shown in the figure below, every COMMIT operation of Iceberg changes the visibility of data, such as changing the data from invisible to visible. In this process, near-real-time data recording can be realized.

2. Real-time data storehouse-data lake analysis system

For example, Spark’s offline scheduling task is used to run data, pull data, extract data, and then write data to the Hive table. This process takes a long time. With Iceberg table structure, Flink or Spark Streaming can be used in the middle to complete near-real-time data access.

Based on the above functions, we will review the Kappa architecture discussed above. The pain points of Kappa architecture have been described above. Since Iceberg can be used as an excellent form, supporting both Streaming reader and Streaming sink, Would you consider replacing Kafka with Iceberg?

Iceberg’s underlying storage is cheap storage like HDFS or S3, and Iceberg supports column storage like Parquet, ORC, and Avro. With support for column storage, basic optimization of OLAP analysis can be performed directly in the middle tier. For example, the most basic OLAP optimization strategy of predicate push-down and the Streaming Reader function based on Iceberg Snapshot can greatly reduce the delay from day level to hour level of offline tasks and transform it into a near-real-time data lake analysis system.

In the middle processing layer, presto can be used for some simple queries. Because Iceberg supports Streaming Read, Flink can also be directly connected to the middle layer of the system to perform some tasks of batch processing or Streaming computing. The intermediate results are further calculated and output to the downstream.

Advantages and disadvantages of replacing Kafka:

In general, the advantages of Iceberg in replacing Kafka mainly include:

  • Realizes the storage layer stream batch unification

  • The middle tier supports OLAP analysis

  • Perfect support for efficient backtracking

  • Storage cost reduction

Of course, there are also some defects, such as:

  • Data latency goes from real-time to near-real-time

  • Interfacing with other data systems requires additional development work

Second level analysis – Data Lake acceleration:

Because Iceberg itself stores all data files in HDFS, HDFS reading and writing can not fully meet our needs for the scene of second-level analysis, so we will support Alluxio as a cache at the bottom of Iceberg. Data lakes can be accelerated with the help of caching capabilities. This structure is also under our future planning and construction.

Reference:

  1. The strongest and most comprehensive specification guide for warehouse construction
  2. Meituan data platform and data warehouse construction practice, over 100,000 words summary
  3. Fifty thousand words | spent a month unscrambles the Hadoop vomiting blood
  4. Number warehouse construction nanny level tutorial PDF document
  5. The most powerful and comprehensive big data SQL classic interview questions complete PDF version