Author: Zhang Ting (Novice data Engineer)

Introduction: In the supply chain logistics scenario, the business complexity is high, the business link is long, the node is many, the entity is many, the real-time data warehouse construction is difficult. Rookie cross-border import business scenario, more complex scenarios bring more complex entity data model, business docking systems led to more ETL process extremely complex, and mass average daily processing data, makes the team in the process of construction of imported number of real-time warehouse, faced with many challenges: how to ensure data accuracy in a complicated entity relationship? How to reduce the complexity of data processing in the case of multiple data sources? How to improve the processing efficiency of real-time multi-stream Join? How to implement real-time timeout statistics? How to recover data state in abnormal cases? This article mainly shares the upgrade experience of the rookie import real-time data warehouse, and how to solve the problems in the development practice by using the characteristics of Flink.

The main contents include:

  1. Related background
  2. The evolution process of inlet real-time data warehouse
  3. Challenges and Practices
  4. Summary and Prospect

01 Related Background

1. Introduction to import business

The process of import business is generally clear. After the domestic buyer places an order, the foreign seller delivers the goods, which goes through customs clearance, trunk transportation, customs clearance in China, distribution and delivery to consumers. Cainiao is responsible for coordinating all resources on the link in the whole process and completing logistics contract fulfillment services. Last year after Koala into ali system, the scale of the entire import business accounted for the scale of domestic import single volume is very high. In addition, the number of orders increases rapidly every year, the order fulfillment cycle is very long, and there are many links involved in the process. Therefore, in data construction, it is very difficult to not only consider the integration of all data, but also ensure the validity of data.

2. Real-time counting warehouse processing process

① General process

Following a brief introduction of several positions of machining process, real-time typically docking business library or log source, through the way of data synchronization, such as Sqoop or DataX carried the news to synchronization in staging in the message middleware, downstream will pick up a real-time calculation engine, the message consumption, consumption after calculation, processing, output some list or summary indicators, Put it on the query service for data application.

② Cainiao internal process

In cainiao, we also follow the same internal process. We synchronize the business database data to TT (kafka-like messaging middleware) by incremental collection of Binlog logs through DRC (Data backup center) to make a message temporary storage, which will then be consumed by a Flink real-time computing engine. ADB is an OLAP engine. Ali Cloud also provides external services, mainly providing some rich multidimensional analysis queries, and writing some mildly summarized or detailed data with rich dimensions. For real-time large-screen scenarios, Because the dimensions are small and the indicators are fixed, we will deposit some highly summarized indicators to HBase for real-time large-screen use.

02 Evolution process of import real-time data warehouse

Next, let’s talk about the evolution process of import real-time data warehouse:

In 2014, the import business line was about 2014, and the offline data warehouse was built to provide daily newspapers.

2015: Can provide hourly report, update frequency from day to hour.

2016: Computing services based on JStorm explore some real-time metrics, more and more real-time. Since 2016 just began to try real-time indicators, indicators are not particularly rich.

2017: Cainiao introduced Blink, also known as the internal version of Flink in Ali, as our streaming computing engine. In the same year, the import business line opened the real-time details and provided external data services through the real-time details large and wide table.

2018: Completed the construction of cainiao real-time data warehouse 1.0.

2020: Construction of real-time warehouse 2.0 begins. Why 2.0? Because 1.0 had many problems in the design process, the whole model architecture was not flexible and expansibility was not high, and some of them were due to the lack of understanding of Blink’s characteristics, which resulted in the increase of operation and maintenance costs caused by misuse. Therefore, a large upgrade was carried out later.

1. Real-time count warehouse 1.0

Then tell me the situation of the real-time warehouse 1.0, because at the beginning of the development at the beginning, the business model is not so stable, so at the start of the strategy is to step run around business, such as for business will develop a real-time detail layer 1, 2 for business will develop a set of real-time tasks, the advantage of rapid iteration, business development, does not affect between each other, It’s more flexible early on.

, as shown in the above on the right side of the bottom is the source of various business system, real-time tasks mainly has two layers, one layer is the real-time subsidiary, will develop different business lines for schedule, schedule is for this business line needed data extraction, it is on the ADM layer, namely real-time application layer, application layer custom mainly aimed at the specific scene, For example, when there is a scene that needs to see the overall summary index, data is extracted from each detail table to generate a real-time summary layer table. The whole process is vertical chimney development, and the model is chaotic, difficult to expand, and there are many repeated calculations.

Later, due to the problem of repeated calculation, a layer of abstraction was carried out, and a pre-intermediate layer was added to extract the common part. However, it treated the symptoms rather than the root cause. The whole model was chaotic, and the data construction was not unified, and the expansibility of the model was poor.

2. Real-time warehouse 2.0

After the 2.0 upgrade, there is a clearer picture:

  • Front layer: The underlying data sources are plugged into the front middle layer, masking some of the very complex logic at the bottom.
  • Detail layer: The front layer will give relatively clean data to the detail sheet, and the detail layer can get through each business line and unify the model.
  • Summary layer: There are mild summary and high summary above the detail layer. The mild summary table has many dimensions and is mainly written into the OLAP engine for multidimensional query analysis. The high summary index is mainly precipitate for real-time large-screen scenarios.
  • Interface service: The aggregation layer provides data output based on a unified interface service.
  • Data application: The application layer mainly includes real-time large screen, data application, real-time report and message push.

This is the real-time data warehouse 2.0 upgraded model, although the whole model looks relatively simple, in fact, behind the model from design to development, encountered a lot of difficulties, spent a lot of energy. Here are some of the challenges and practices we encountered during the upgrade process.

03 Challenges and practices

In the process of real-time warehouse upgrade, we face the following challenges:

1. Multiple business lines and business models

The first one is that there are a lot of business lines connected, and different business lines have different modes. As a result, the models of small and fast running are fragmented at the beginning. There is no reusability between models, and the development, operation and maintenance costs are high, resulting in serious resource consumption.

Solution: Logical middle tier upgrade

We think of A simple idea is to build A unified data layer, such as business have outbound, LanShou, send A few businesses such as A node, the business B may be A few other nodes, the whole model is fragmented state, but in fact, business development to the later period is stable, relatively stable between each business model, This time can be an abstraction of data, such as business have A node 1, 5, and several other business model is the same, through the way of alignment and find out which is public, which are public, extracted precipitation into logical intermediary, thus shielding the gap between business and unification of the construction of data. There is another big reason to unify the logical middle layer. Although business A, B and C are different business systems, such as fulfillment system and customs system, they are essentially the same set. The underlying data sources are also abstracted, so the data warehouse modeling should also be constructed with A unified idea.

2. Multiple business systems and large data sources

The second is that there are many interconnected systems, and each system has a large amount of data. Every day, there are more than a dozen data sources of 100 million levels, which is very difficult to sort out. Problems are also obvious. The first problem is the problem of large states, which need to be maintained in Flink, and how to control the cost after accessing so many data sources.

Solution: Make good use of State

State is a feature of Flink, because it ensures State calculation and needs to be used more rationally. We need to figure out what State does, when we need it, and how to optimize it, and those are all things to consider. There are two types of State, one is KeyedState, which is specifically related to the Key of the data. For example, in SQL, Group By, Flink will store related data according to the Key value, for example, in a binary array. The second is OperatorState, which is specific to the operator, such as recording offsets read from the Source Connector, or how the state is restored between operators after a Failover.

(1) “Deduplication” during data access

For example, how to use KeyedState, such as logistics order flow and fulfillment log flow, two jobs are associated to produce a large table that is ultimately required. How to store the Join? Flow is kept, order may not be consistent message arrived, need to put the operator inside it, to Join the state of the node, more simple and crude way is save the left and right flows at the same time, through this way to ensure whether the message is to first or after, at least ensure operator data inside is full, even if one of them arrived late, Note that the State store is different depending on the upstream. For example, if a primary key Rowkey is defined upstream, and JoinKey contains the primary key, there will be no multiple orders corresponding to the same foreign key. This tells the State to store only the unique rows as per the JoinKey. If there is a primary key upstream, but the JoinKey does not contain rowKeys, you need to store both Rowkeys in State. The worst situation is that the upstream no primary key, there are 10 messages, such as the same order will have order, the last one is effective, but don’t know which is effective for system, do not specify a primary key is not good to heavy, it will all put down, special consumption of resources and performance, a way is very poor.

Therefore, we “de-duplicate” the data at the time of access. During data access, sort the data according to row_number and tell the system to update the data according to the primary key, so as to solve the problem of not knowing how many of the 10 messages should be saved. In this case, update by primary key, fetching the last message at a time.

Following the row_number approach does not reduce the amount of data to be processed, but it greatly reduces the amount of State storage so that each State has only one valid State, rather than all of its historical data.

② Multi-stream Join optimization

The second one is the optimization of multi-stream Join. For example, as the pseudocode on the left in the figure above, a main table is associated with many data sources to produce a detailed large and wide table. This is the way we like, but it is not good. In real-time calculation, such SQL will be processed in sequence according to dual-stream Join mode, and only one Join can be processed at a time. Therefore, for example, there are 10 joins in the code on the left, and 10 Join nodes on the right. The Join node will store all data of the left stream and the right stream simultaneously, so you can see the red box in the figure on the right. Each Join node will store both left stream and right stream nodes at the same time. Assuming that our order source has 100 million, 1 billion is stored in it. This data storage is very terrible.

Another is that the link is very long, and the continuous network transmission, calculation, task delay is also very large. Like a dozen data source fetches associated with each other, it is real in our actual scene, and our association relationship is even more complex than this.

So how do we optimize? We adopt the method of Union All to misplace the data and add a layer of Group By, which translates the Join association into Group By. Its execution diagram is just like the one on the right of the figure above. Yellow is the storage required in the process of data access, and red is a Join node. So the process need to store the State is very small, the main table will be in a yellow box and red box are stored, although data very much, in fact will only save a copy of the data, such as our logistics order is 10 million, other data sources also is 10 million, the end result effective line is 10 million, the data storage capacity is not high, Assuming that a new data source is connected, the log quantity may be 10 million again, but in fact, the effective record is 10 million. Only one data source is added and a data update is made, and the cost of the new data source is close to zero. Therefore, replacing Join with Union All is a great optimization in State.

3. The number of foreign keys is out of order

The third is to take a few more foreign keys, the problem of random sequence, order there are a lot of kinds, sampling system is out-of-order, or caused by the random sequence in the process of transmission, our side is going to discuss, in the actual development process accidentally caused by out-of-order, because other aspects of the platform has helped us to consider well, provides a good end-to-end guarantee consistency.

For example, if two orders are both logistics orders, take some messages in the warehouse according to the order number. Message 1 and message 2 enter the flow processing successively, and Shuffle the messages according to the JoinKey when they are associated. In this case, the two messages will flow to different operators for concurrency. Is likely to lead to the news of the first to enter the system after complete the processing, such as message 1 to reach the system first, but the process is slow, message 2 but output first, leading to the final output is wrong, is essentially a multiple concurrent scenarios, the uncertainty of data processing flow, more of the same order pen message flow to different places to calculate, could lead to chaos sequence.

So, after the same order message is processed, how to ensure that it is in order?

Binary logs are sequentially written to Kafka, requiring a certain strategy and sequential collection. You can Hash the data into Kafka based on the primary Key. Ensure that each partition in Kafka stores data with the same Key. When Flink consumes Kafka, it is necessary to set up reasonable concurrency to ensure that one partition is responsible for the data of one Operator. If two operators are responsible for the data of one partition, a similar situation will exist, resulting in messages out of order. In addition, it is necessary to cooperate with downstream applications to ensure that updates or deletes are performed according to certain primary keys to ensure end-to-end consistency.

Flink has helped us achieve end-to-end consistency with the upstream and downstream systems, we just need to ensure that the internal processing tasks can not be out of order. Our solution is to avoid the change of Join Key. For example, the Join Key is changed into the business primary Key through special mapping relationship in advance to ensure the orderly task processing.

4. Heavy service pressure due to detailed dependence on statistical indicators

Another difficulty is that many of our statistical indicators rely on details, mainly some real-time statistics. This risk is quite obvious, and the pressure on the server side is particularly heavy, especially when promoting, it is extremely easy to drag down the system.

Real-time statistics is a typical scenario, overtime, for example, two orders, there will be such an order at 1 o ‘clock to create a logistics order, outbound at 2 o ‘clock, how statistics not LanShou ShouChanLiang over 6 hours, because there is no message will not be able to trigger calculation, Flink is based on message triggers, such as outbound 2 o ‘clock, Theoretically at 8 o ‘clock to the six hours without LanShou single amount to add 1, but because there is no news triggered, downstream system will not trigger calculation, this is a difficult thing, so no special plan at the beginning, we directly from the list, such as order check-in time is 2 o ‘clock, generate the detail later, wrote the OLAP engine database, Compare the calculation with the current details.

We also explores some scheme based on message oriented middleware, for example, do some regular timeout messages issued, or is explored based on Flink CEP, the first way to introduce a third party middleware, maintenance costs will be higher, the CEP this way USES the time window steadily forward, logistics scenarios such as we have a lot of such a situation, For example, if the sending time is returned at 2 o ‘clock and the sending time is found wrong, another time will be added at 1:30, then we need to trigger the calculation again, Flink CEP cannot support it well. Later, we explored the method of creating a message flow based on Flink Timer Service, based on the Timer Service callback method of Flink. First, we connected the data flow in our method. According to some rules defined by us, for example, the outbound time is 2 o ‘clock. A timeout time of 6 hours will be defined and registered in the Timer Service. A comparison calculation will be triggered at 8 o ‘clock, or a timeout message will be triggered if not. The whole scheme does not rely on third-party components, and the development cost is relatively low.

5. Multiple performance links and long data links

Another difficulty is that we have many implementation links and long data links, which makes it difficult to deal with abnormal situations. For example, the message should be kept valid for more than 20 days, and the State should also be stored for more than 20 days. The State is always stored in Flink. If there is a data error or logical processing error one day, traceability is a big problem, because the upstream message system generally keeps the data valid for three days.

Here are some real cases.

Case 1:

We found a Bug in double tenth period, double tenth one has in the past several days, because we perform link special long, 10 ~ 20 days, the first time to find error can’t do any change has changed after DAG perform figure change, state will not be able to recover, and the upstream can only after 3 days, after changed is equal to the number of the upstream of the all gone, This is unacceptable.

Case 2:

For some super-long tail orders during the epidemic, the TTL of State was set at 60 days, and we thought that all of them would be completed within about 60 days. Later, we found that the data began to be distorted after 24 days, and the validity period was clearly set at 60 days. Later, we found that the underlying State storage used int, so it could only save the validity period of more than 20 days at most. It is equivalent to triggering a boundary case of Flink, so it also proves that our scene is indeed very complex, and many states need extremely long State life cycle to ensure.

Case 3:

Each time the code stops upgrading, the status will be lost, and the data needs to be pulled again for calculation. However, generally the upstream data is only valid for 3 days, so the business can only see the data for 3 days, and the user experience is very bad.

Solution: Batch mixing

What do we do?

Batch stream mixing method is adopted to complete state reuse, real-time message flow is processed based on Blink stream processing, and offline calculation is completed by Blink batch processing. Through the fusion of the two, calculation of all historical data is completed in the same task. For example, order message flow and fulfillment message flow conduct an associated calculation. Then, an offline order message source will be added to the task, which will be combined with our real-time order message source Union All. Next, a Group By node will be added, which will be de-duplicated according to the primary key. In this way, state reuse can be realized. There are several points that need to be noted. The first one needs to define the Source Connector for development, and the other one involves the merging of offline messages and real-time messages. After GroupBy, whether offline messages or real-time messages should be taken first, real-time messages may be consumed slowly, and which message is real and effective needs to be judged. So we also customize some, such as LastValue, to determine whether the task takes offline or real-time messages first, and the whole process is based on Blink and MaxCompute.

6. Small Tips

(1) Message delivery cannot be withdrawn

The first is that once a message is sent, it cannot be withdrawn, so some orders are valid at first and then become invalid. Such orders should not be filtered in the task, but marked down and used for statistical purposes.

② Add data version, data processing time and data processing version

  • Data version is the version definition of the message structure to avoid dirty data read after the task restarts after the model upgrade.
  • The processing time is the current processing time of the message. For example, when the message flows back offline, we will sort the time according to the primary key, get the latest record, and restore a quasi-real-time data in this way.
  • The data processing version was added because it was not precise enough, even at the millisecond level, to distinguish the order of messages.

③ Real-time logarithm scheme

Real-time logarithm scheme has two levels, real-time detail and offline detail, just also mentioned to real-time data back into the offline, we can see the current produced by 24 o ‘clock news, because the offline T + 1 can see 59 minutes and 59 seconds 23 point data yesterday, real time simulation, can we only capture the moment of data reduction, then comparing the real-time and off-line, This can also be very good data comparison, in addition to real-time details and real-time summary comparison, because they are in the same DB, comparison is also particularly convenient.

03 Summary and Outlook

1. Summary

A quick summary:

  • Model and Architecture: A good model and architecture is 80% success.
  • Accuracy requirements evaluation: Data accuracy requirements need to be evaluated, whether CheckPoint alignment or semantic consistency is really required, and in some cases average accuracy is ok, so there is no need for so many additional resource consuming designs.
  • Take advantage of Flink features: You need to take advantage of some of Fink’s features to avoid the pain of misuse, such as State and CheckPoint.
  • Code self-review: ensure data processing is on track and on target.
  • SQL understanding: write SQL is not how lofty, more test is in the process of data flow some thinking.

2. Looking forward to

① Real-time data quality monitoring

Real-time processing is not like batch processing. After the batch processing, you can run a small script to check whether the primary key is unique and record the number fluctuation. Real-time data monitoring is a more troublesome thing.

② Flow batch unification

There are several levels of stream batch unification, the first is the storage level of unification, real-time and offline writing to the same place, more convenient application. The second is the integration of computing engines, such as Flink, which supports both batch and stream processing and can write to Hive. At a higher level, the processing results can be unified. For the same code, the semantics of the batch and stream may be different. How to achieve the same code, the processing results of the batch and stream are completely unified.

③ Automatic tuning

There are two kinds of automatic tuning. For example, when we apply for 1000 Core resources, how to allocate 1000 Core resources reasonably? Where may be the performance bottleneck, we should allocate more. There is also a kind of such as early morning no single volume, no data flow, this time can be adjusted to the resource is very small, automatic adjustment according to the data flow situation, that is, automatic scalability.

The above is our overall outlook and research direction for the future.