This article is based on the topic “Practice of Constructing Real-time data Warehouse Scenario based on Flink” shared by Kuaishou data technology expert Li Tianshuo at The Flink Meetup in Beijing on May 22. The content includes:
- Quick hand real-time computing scenarios
- Qaishou real-time data warehouse structure and safeguard measures
- Quick hand scene problems and solutions
- The future planning
I. Quick hand real-time computing scenario
The real-time computing scenarios in Kuaishou business are mainly divided into four parts:
-
** Company-level core data: ** includes the company’s operating market, real-time core daily, and mobile version data. In other words, the team will have a core real-time kanban for the company’s overall index and each business line, such as video related and live related.
-
** Real-time indicators of large events: ** The most core content is real-time large screen. For kuaishou’s Spring Festival Gala, we will have an overall large screen to see the overall status of the activity. A large event will be divided into N different modules, and we will have different real-time data kanban for different gameplay of each module;
-
** Operation data: ** Operation data mainly includes two aspects, one is creator, the other is content. For creators and content, on the operation side, such as launching a big V event, we want to see some information, such as the real-time status of the broadcast room, and the traction of the broadcast room to the market. Based on this scenario, we will make various real-time multi-dimensional data of large screen, as well as some data of large screen.
In addition, this area also includes the support of business strategy, for example, we may discover some hot content and hot creators in real time, as well as some current hot situation. We output strategies based on these hot spots, and this is also some of the support capabilities that we need to provide;
Finally, it also includes c-terminal data display. For example, now Kuaihand has creator center and Anchor center, where there are some closed broadcast pages such as anchor closed broadcast pages. Part of the real-time data of closed broadcast pages is also done by us.
-
** Real-time features: ** includes search recommendation features and real-time advertising features.
Second, kuaishou real-time data warehouse structure and safeguard measures
1. Objectives and difficulties
1.1 the target
-
First of all, since we are engaged in data warehousing, we hope that all real-time indicators have off-line indicators to correspond to, requiring the overall data difference between real-time indicators and off-line indicators to be within 1%, which is the minimum standard.
-
The second is the data delay. The SLA standard is that the data delay of all core report scenarios should not exceed 5 minutes during the activity. This 5 minutes includes the time after the job hangs and the recovery time.
-
Finally, stability. For some scenarios, such as the restart of the job, our curve is normal, and the restart of the job will not lead to some obvious abnormalities in the index output.
1.2 the difficulty
-
The first difficulty is the large amount of data. The total daily inbound traffic data is on the order of trillions. In the scenario of events such as the Spring Festival Gala, the peak QPS can reach 100 million/second.
-
The second difficulty is the complexity of component dependencies. Maybe some of the links depend on Kafka, some on Flink, some on KV storage, RPC interface, OLAP engine, etc. We need to think about how to distribute the links so that all of these components can work properly.
-
The third difficulty is the complexity of the link. Currently we have 200+ core business jobs, 50+ core data sources, and more than 1000 jobs overall.
2. Real-time data storehouse – layered model
Based on the above three difficulties, let’s look at the data warehouse architecture:
As shown above:
- The lowest level has three different data sources, namely client logs, server logs, and Binlog logs.
- The public base layer is divided into two different levels, one is THE DWD layer, which does detailed data, and the other is the DWS layer, which does public aggregated data. DIM is the dimension we often say. We have a topic pre-layering based on offline storehouse, which may include traffic, users, devices, production and consumption of video, risk control, social, etc.
- The core work of the DWD layer is standardized cleaning;
- The DWS layer correlates dimension data to the DWD layer, which generates aggregation hierarchies of some general granularity.
- Next is the application layer, which includes some large-scale data, multi-dimensional analysis model and business thematic data.
- The scene is at the top.
The whole process can be divided into three steps:
-
The first step is to do business data, equivalent to the business data in;
-
The second step is data capitalization, which means to do a lot of cleaning of data, and then form some regular and orderly data.
-
The third step is the businessization of data. It can be understood that data can feed back business at the level of real-time data and provide some empowerment for the value construction of business data.
3. Real-time data warehouse – Safeguard measures
Based on the layered model above, take a look at the overall safeguard measures:
The guarantee level is divided into three different parts, namely, quality guarantee, time-effectiveness guarantee and stability guarantee.
- Let’s look at the quality assurance in blue first. In terms of quality assurance, it can be seen that in the data source stage, we conducted out-of-order monitoring of data sources, which was based on the collection of our SDK, as well as the consistency calibration of data sources and offline. The calculation process in the research and development stage has three stages, namely, the research and development stage, the online stage and the service stage.
- In the development stage, a standardized model may be provided, based on which there will be some benchmarks and off-line comparison and verification to ensure consistent quality.
- The on-line stage is more about service monitoring and index monitoring.
- In the service stage, if there are some abnormal conditions, we will first pull up Flink state; if there are some scenes that do not meet the expectations, we will do offline overall data repair.
- The second is timeliness. For data sources, we also monitor the latency of data sources. There are actually two things in the development phase:
- The first is pressure measurement. A conventional task will take the peak flow in the last 7 days or 14 days to see whether there is task delay.
- After passing the pressure test, there will be some task on-line and restart performance evaluation, which is equivalent to what the restart performance will look like after CP recovery.
- The last one is stability assurance, which can be done a lot in large events, such as switching drills and graded assurance. We will limit the current based on the previous pressure test results, in order to ensure that the operation is still stable when the limit is exceeded and there will not be many instability or CP failure. Then we will have two different standards, one is cold standby double room, the other is hot standby double room.
- Cold standby double machine room is: when a single machine room fails, we will pull up from another machine room;
- Hot backup dual equipment rooms: Deploy the same logic in both equipment rooms once.
So that’s our overall safeguards.
3. Problems and solutions of Quick Hand scene
1. PV/UV standardization
1.1 scenario
The first issue is PV/UV standardization. Here are three screenshots:
The first picture is the warm-up scene of the Spring Festival Gala, which is a kind of play. The second and third pictures are the screenshots of the activities of giving red envelopes and the live broadcast room on the day of the Spring Festival Gala.
In the course of the activity, we found that 60-70% of the requirements were for computing the information in the page, such as:
-
How many people come to the page, or how many people click through to the page;
-
How many people came to the event;
-
How many clicks a widget on a page gets and how many exposures it generates.
1.2 plan
To abstract the scenario, this is the following SQL:
In simple terms, you filter from a table, aggregate by dimension level, and then generate some Count or Sum operation.
Based on this scenario, our initial solution is shown on the right side of the figure above.
We used the Early Fire mechanism of Flink SQL to fetch data from the Source data Source and then DID buckets of DID. For example, at the beginning, the purple part was divided into buckets according to this method, and buckets were divided first to prevent hot spots in a CERTAIN DID. After buckets are divided, there is something called Local Window Agg, which is equivalent to adding up data of the same type after buckets are divided. Local Window Agg is followed by the combined bucket of the Global Window Agg based on the dimensions. The concept of combined bucket is equivalent to calculating the final result based on the dimensions. The Early Fire mechanic is equivalent to opening a sky-high Window in Local Window Agg and exporting it every minute.
We ran into some problems along the way, as shown in the lower left corner of the figure above.
There is no problem under the normal operation of the code, but if the overall data is delayed or historical data is traced, for example, Early Fire once a minute, the amount of data will be large during the historical tracing, so it may lead to the history tracing at 14:00 and directly reading the data at 14:02. The point at 14:01 is lost. What happens when you lose it?
In this scenario, the curve at the top is the result of Early Fire’s historical backtracking. The x-coordinate is minutes, and the y-coordinate is the page UV as of the current time, and we see that some points are horizontal, meaning there’s no data, and then there’s a sharp increase, and then there’s a sharp increase, and then there’s a sharp increase, and the expected result of this curve is actually the smooth curve at the bottom of the graph.
To solve this problem, we used the Cumulate Window solution, which is also covered in Flink 1.13, and it works the same way.
Data opens a large day level window, and a small minute level window opens under the large window. Data falls to the minute level window according to the Row Time of data itself.
- Watermark pushes event_time of the window, and it triggers a delivery. In this way, the backtracking problem can be solved by the data itself falling into the real window, and Watermark pushes and triggers after the window ends.
- In addition, this method can solve the disorder problem to a certain extent. For example, its out-of-order data itself is a state that is not discarded, and the latest accumulated data will be recorded.
- Finally, semantic consistency, which will be based on the event time, is quite high consistency with the results calculated offline in the case of not serious disorder.
The above is a standardized solution for PV/UV.
2. DAU calculation
2.1 Background
Here’s how to calculate DAU:
We have a lot of monitoring of active equipment, new equipment and backflow equipment across the whole market.
-
Active equipment refers to the equipment that came that day;
-
New equipment refers to the equipment that has come on the day and has not come in history;
-
Backflow equipment refers to equipment that has been in the same day and not in N days.
However, we may need 5~8 different topics to calculate these indicators in the calculation process.
So let’s look at what the logic should be for an offline process.
First we count the active devices, combine them together, and then do a day-level de-weighting for a dimension. Then we associate the dimension table, which includes the first and last times of the device, i.e. the time of the first and last access to the device until yesterday.
Once we have this information, we can perform a logical calculation and discover that the new and refluxed devices are actually subtags of the active devices. The newly added equipment has done a logical processing, and the reflux equipment has done a logical processing for 30 days. Based on such a solution, can we simply write a SQL to solve this problem?
That’s actually what we did at first, but we ran into some problems:
-
The first problem is: there are 6 to 8 data sources, and the caliber of our large market is often fine-tuned. If it is a single operation, each fine-tuning process should be changed, and the stability of single operation will be very poor.
-
The second problem is: the data volume is trillions, which will lead to two situations. First, the stability of a single job of this magnitude is very poor. Secondly, the KV storage is used when the dimension table is associated in real-time.
-
The third problem is that we have high requirements for delay, which is less than one minute. Batch processing is avoided throughout the link, and if there is a single point of task performance problem, we also need to ensure high performance and scalability.
2.2 Technical Solutions
In view of the above problems, let’s introduce how we did:
As shown in the example in the figure above, the first step is minut-level de-duplication of data sources A, B and C according to dimension and DID. After de-duplication, three data sources with minut-level de-duplication are obtained, and then they are Union together, and then the same logical operation is performed.
This is equivalent to changing the entry of our data source from trillions to billions. After minutes-level de-weighting, a day-level de-weighting is carried out, and the data source generated can be changed from billions to billions.
In the case of billions of levels of data, we disassociate the data servitization, which is a relatively feasible state, equivalent to disassociating the RPC interface of the user portrait, and finally writing the RPC interface to the target Topic. This target Topic will be imported into the OLAP engine to provide a number of different services, including mobile services, large screen services, metrics kanban services, etc.
This scheme has three advantages, namely stability, timeliness and accuracy.
-
The first is stability. Loose coupling can simply be understood as changing the logic of data source A and data source B separately when they need to be changed. The second is task capacity expansion, because we split all the logic into very fine-grained, when some places, such as traffic problems, will not affect the later part, so it is relatively simple to expand, in addition to the servitization post and state control.
-
The second is timeliness. We have millisecond delay and rich dimensions. On the whole, there are 20+ dimensions for multi-dimensional aggregation.
-
Finally, accuracy. We support data verification, real-time monitoring, model exit unification, etc.
At this point we have another problem – out of order. For the three different jobs above, there will be a delay of at least two minutes for each job to restart, which will cause the downstream data source Union to be out of order.
2.3 Delay calculation scheme
What do we do when we have a disorder like this?
We have three solutions in total:
-
The first solution is to use “did + dimension + minute” for deduplication, and Value is set to “have you been here?”. For example, the same DID, 04:01 came, it will output the result. Similarly, 04:02 and 04:04 will also be output. But if it comes again at 04:01, it will discard it, but if it comes again at 04:00, it will still output the result.
There were some problems with this solution, because we saved by minute, and the state size for 20 minutes was twice as big as the state size for 10 minutes, which became a little out of control, so we switched to solution 2.
-
In the second solution, our approach involves the assumption that there is no data source out of order. In this case, the key stores “did + dimension” and the Value is “timestamp”, which is updated as shown in the figure above.
A piece of data came in. Output the result. A piece of data comes in, if it is the same DID, then it will update the timestamp and still do the result output. The same logic applies to 04:04, then it updates the timestamp to 04:04, and if it finds that the timestamp has been updated to 04:04, it discards the timestamp.
This approach greatly reduces some states required by itself, but there is zero tolerance for disorder and no disorder is allowed. Since we cannot solve this problem, we have come up with solution 3.
-
Scheme 3 is based on scheme 2 timestamp, add a similar ring buffer, allow out of order in the buffer.
For example, at 04:01, a piece of data is sent to output the result; A datum arrives at 04:02. It updates the timestamp to 04:02 and records that the same device was there at 04:01. If another piece of data comes at 04:04, it will make a displacement according to the corresponding time difference, and finally ensure that it can tolerate certain disorder through such logic.
Taken together, these three options:
-
In scheme 1, when 16 minutes of disorder is tolerated, the state size of a single job is about 480 GB. Although this situation ensured the accuracy, the recovery and stability of the operation were completely uncontrollable, so we still gave up this plan.
-
Scheme 2 has a state size of about 30G and tolerates 0 disorder, but the data is not accurate. Since we have very high requirements on accuracy, we give up this scheme.
-
Compared with scheme 1, the state of scheme 3 has changed but not increased much, and it can achieve the same effect as scheme 1 on the whole. Scheme 3 tolerates disorder for 16 minutes. If we update a job normally, 10 minutes is enough to restart. Therefore, scheme 3 is finally selected.
3. Operation scenario
3.1 Background
The operation scenario can be divided into four parts:
-
The first one is data large screen support, including the analysis of single broadcast room data and the analysis of the large market data, need to achieve minute delay, update requirements are relatively high;
-
The second is the support of live broadcast kanban. The data of live broadcast kanban will be analyzed in specific dimensions and supported by specific groups, which have high requirements for dimension richness.
-
The third is the data strategy list, which is mainly used to predict popular works and popular styles. The list requires hourly data with relatively low update requirements.
-
The fourth is the real-time indicator display at the C-end, with a large amount of queries but a fixed query mode.
Let’s examine some of the different scenarios that arise from these four different states.
The first three are basically the same, except in the query mode, some are specific business scenarios, some are general business scenarios.
For the third and fourth kinds, it has relatively low requirements for update, relatively high requirements for throughput, and the curve in the process does not require consistency. The fourth query mode is more about some queries of single entities, such as what indicators there will be to query the content, and it has high requirements for QPS.
3.2 Technical Solutions
For the above four different scenarios, how did we do it?
-
Taking a look at the basic detail layer (left), the data source has two links, one of which is the flow of consumption, such as live consumption information, and views/likes/comments. Go through a base wash, then do dimension management. The upstream dimension information comes from Kafka, which writes some dimensions of content into KV storage, including some dimensions of the user.
After these dimensions are associated, the DWD fact layer is finally written to Kafka. For performance improvements, we have implemented a level 2 cache.
-
In the upper part of the figure, we read the data of DWD layer and make basic summary. The core is window dimension aggregation to generate four kinds of data with different granularity, namely, market multidimensional summary topic, live broadcast multidimensional summary topic, author multidimensional summary topic and user multidimensional summary topic, all of which are data of general dimension.
-
In the lower part of the figure, based on these general dimension data, we processed the data of personalized dimension, namely ADS layer. After obtaining these data, dimension expansion will be carried out, including content expansion and operation dimension expansion, and then aggregation will be carried out, such as e-commerce real-time topic, institutional service real-time topic and big V live real-time topic.
Splitting into two such links has an advantage: one place handles the general dimension and the other handles the personalized dimension. General dimension guarantee requirements will be higher, personalized dimension will do a lot of personalized logic. If the two are coupled together, you will find that tasks often break down and it is not clear which tasks have what responsibilities to build such a stable layer.
-
On the right, we ended up with three different engines. In simple terms, Redis query is used in the C-terminal scenario, OLAP query is used in the large screen, business kanban scenario.
4. Future planning
There are three scenarios mentioned above. The first scenario is standardized PU/UV calculation, the second scenario is the overall DAU solution, and the third scenario is how to solve the operation side. Based on this, we have some future plans, divided into four parts.
-
The first part is the improvement of real-time guarantee system:
- On the one hand, do some large-scale activities, including Spring Festival Gala activities and subsequent regular activities. For how to guarantee these activities, we have a set of norms to do platform construction;
- The second is the establishment of hierarchical guarantee standards. There will be a standardized description of which operations are guaranteed at what level/standard.
- The third is the engine platform capability promotion solution, including some engines of Flink task, on which we will have a platform, based on which to do standardized promotion.
-
The second part is real-time data warehouse content construction:
- On the one hand, it is the output of scenarioalized solutions, such as some generic solutions for activities, rather than developing a new set of solutions for each activity;
- On the other hand, content data hierarchical precipitation, such as the current data content construction, has some scenes missing in terms of thickness, including how the content better serves the upstream scenes.
-
The third part is the construction of Flink SQL scenario, including continuous SQL promotion, SQL task stability and SQL task resource utilization. In the process of resource estimation, we will consider, for example, what kind of QPS scenario, what kind of SQL solution, and what kind of situation can be supported. Flink SQL can dramatically reduce human efficiency, but in the process, we want to make business operations much simpler.
-
The fourth part is the integration of batch flow exploration. The scenario of real-time warehouse is actually to accelerate the offline ETL calculation. We will have many hour-level tasks. For these tasks, some logic can be put into stream processing for each batch processing, which will greatly improve the SLA system of offline warehouse.