Brief introduction: “Real-time Databank Introduction Training Camp” by AliYun researcher Wang Feng, Aliyun senior technical expert Jin Xiaojun, Aliyun senior product expert Liu Yiming and other real-time computing Flink version and Hologres technology/product line experts together to build the course system of the training camp, carefully polished the course content. Directly hit the current students encountered pain point problem. From shallow to deep comprehensive analysis of real-time warehouse architecture, scene, and practical application, 7 quality courses to help you grow from small white to bull in 5 days!

This article is from the live video “Analysis of real-time recommendation system architecture based on Apache Flink + Hologres – Qin Jiangjie” link :c.tb.cn/F3.0d98Xr

Abstract: This article is organized by the real-time digital warehouse online course teacher Qin Jiangjie’s speech. Brief content: i. Principle of real-time recommendation system; II. Architecture of real-time recommendation system; III. Key technologies of real-time recommendation system based on Apache Flink + Hologres

Principle of real-time recommendation system

(1) Static recommendation system

Before introducing real-time recommrecommit systems, let’s take a look at what a static recommit system looks like.

Above is the architecture of a very classic static recommendation system. There will be a lot of user applications on the front end, and these users will generate a lot of user behavior logs, which will be put into a message queue, into ETL. Then, some feature generation and model training are done through the offline system. Finally, the model and feature are pushed into the online system, and the online reasoning service can be invoked to obtain the recommendation results through online services. This is a very classic static recommendation system operation process, let’s take a specific example to see how the static recommendation system works.

As shown in the picture above, for example, the behavior log of online users may be the log of some users’ browsing and clicking on advertisements. The purpose of the recommendation system is to recommend advertisements for users, so the following user behaviors can be seen in the log:

User 1 and user 2 both see PageID 200 and some other pages, and then user 1 sees PageID 200 and clicks on AD 2002, so such a series of behaviors can be summarized in user log through ETL, and then sent to model training. In the process of training the model, we will use some features. In this case, we can find that both user 1 and user 2 are Male users in China, which may be a feature of the user dimension.

In this case, the result we see from the log is that the user clicked on the AD 2002 after seeing PageID 100, and both users were Male users in China. Therefore, it is possible for our model to learn that when a Chinese male user looks at PageID 100, he should be shown AD 2002, and this behavior will be trained into the model. At this point, we will push some of the user’s offline features to the signature database, and then push the model to the line.

Suppose you have a user ID4, who happens to be a male user in China, the feature is pushed into the database, and the model is pushed online. If user 4 looks at PageID 100 when visiting, the reasoning service will first look at the characteristics of user ID4, and then according to the fact that he is a Chinese male user, the system will push ads to him through the training model. This is the basic working principle of a static recommendation system.

In this case, if something changes, let’s look at whether the static recommendation system will continue to work well.

Let’s say that the characteristic model of user 1 and user 2 is trained today, and the behavior of user 4 is found the next day. According to the content in the model, the model will think that user 4 is a Male user in China, and the behavior of user 1 and user 2 is the same. Therefore, the behavior of Chinese male users should be promoted to him. But at this point we realize that user 4 is actually behaving more like user 3 than it is like user 1 or user 2.

In this case, since the model and characteristics are static, the model needs to be retrained in order to make user 4 more similar to the behavior obtained by user 3. This will delay the prediction effect, because user 4 needs to be retrained to recommend some behaviors that are more similar to user 3.

Therefore, in this practical operation, it can be seen that the static recommendation model has some problems:

  • Static generation model and features;
  • Taking the classification model as an example, users are classified according to their similarity. It is assumed that similar users have similar interests and behaviors, for example, male users in China have similar behaviors. Once a user is classified into a category, he remains in that category until he is reclassified by the new model training.

In this case, it is difficult to make a good recommendation for the following reasons:

  • Users’ behaviors are so diversified that they cannot be classified into a fixed category: 1) Purchasing health care products for their parents in the morning, booking hotels for business trips at noon, and buying clothes for their families in the evening… 2) Static systems fail to accurately place users in the right category at the moment.
  • 1) It is assumed that users “follow the crowd”, but the “crowd” may change; 2) The “flow” as seen in historical data may not accurately reflect what’s going on online.

(2) Add real-time feature engineering recommendation system

To solve the above problems, dynamic features can be added. So what are the dynamic features? Let me give you an example.

As shown in the figure above, we take for example the dynamic characteristics of flow changes. The previous model recommendation is that if a Chinese male user visits PageID 100, he will be recommended AD 2002, which is a fixed behavior.

Make some changes on this basis. When sampling the real-time feature, the real-time feature is the top 10 ads that Chinese male users click on the most during a recent period of time, that is, when they visit PageID 100. This feature can’t be calculated offline, because it’s an online user behavior that happens in real time.

So what’s one thing you can do after generating user behavior? When Chinese male users visit PageID 100, they can not just tweet ads 2002 to them, but the ads that Chinese male users click the most during the recent time when they visit PageID 100.

In this case, if Chinese male users visit PageID 100, the most recently visited ads are 2001 and 2002. When the user ID comes and we see that it is a Chinese male user, it is possible to recommend AD 2001 to him instead of AD 2002.

So that’s an example of a change in the flow.

By the same token, because the system can sample the user’s characteristics in real time, it can better determine the user’s intentions at that moment. For example, you can see what pages and products a user has been looking at in the last minute, so that you can judge what the user is thinking in real time and recommend an AD that is more suitable for his current intention.

Is there no problem with such a recommendation system? Let’s do another example.

For example, user 1 and User 2 were both Male Chinese users. It was assumed that their behavior was similar, and this was confirmed in the previous historical data. But what might happen when you actually look at user behavior online?

It is possible that the behavior of user 1 and user 2 May diverge for a variety of reasons, but it is not known why. What is recommended for user 1 and user 2 May be completely different, so what causes the differentiation?

For example, if user 1 is from Shanghai and user 2 is from Beijing. One day Beijing has a very big cooling down, at this time Beijing user 2 May start to search for long Johns, but Shanghai that day is still very hot, Shanghai user 1 in the search for clothing, may still search for some summer clothes. At this time, among Chinese male users, the search behavior of Shanghai user 1 and Beijing user 2 has changed. They need to be presented with different ads, but a static model doesn’t do that very well.

This model is actually a static training model, so if it is a classification model, the category that can be generated in it is actually a fixed category. In order to produce a new classification, the model needs to be trained again. Since the model training is carried out offline, the training model may need to be updated on the next day, which will affect the recommendation effect.

  • By adding dynamic feature 1) to track the behavior of a class of users in real time to fit the “flow”; 2) Track users’ behavior in real time to understand their intentions at the moment and classify them into more appropriate categories.
  • However, when the classification method of the model itself changes, the most appropriate category may not be found, and the model needs to be retrained to add classification.

For example: New products are launched frequently, services grow rapidly, and the distribution of user behaviors changes rapidly. When confronted with the above problems, we need to add things to the dynamic model update. How to do the dynamic model update? It’s the same thing.

As shown in the figure above, in addition to ETL users’ real-time behavior logs to offline places for Feature Generation, user behavior logs may also be exported online, and then Feature Generation, sample stitching, and in-line model training are performed.

Model training here is usually streaming training, incremental training on top of a basic model to better match the changes in user behavior at the moment. In this case, by training this real-time sample, the model can generate new classifications, and it will know that the behavior of users in Shanghai and Beijing may be different. Therefore, when a user visits PageID 100, it might recommend AD 2002 for a user in Shanghai, and AD 2011 for a user in Beijing.

In this case, if user 4 comes back, the system will see if he is from Shanghai or Beijing, and if he is from Shanghai, it will recommend AD 2002 to him.

Features of the recommendation system with real-time model training:

  • On the basis of dynamic features, the real-time training model is used to make the model as close as possible to the distribution of user behavior at this moment.
  • Mitigate degradation of the model.

Real-time recommendation system architecture

The above example is to understand how a real-time recommendation system works and why it works better than a typical offline recommendation system. So how do you build a workable real-time recommendation system with Flink plus Hologres and other systems/projects?

(I) Classical offline recommendation system architecture

First, take a look at the architecture of the classic offline recommendation system mentioned above, as shown below.

This architecture is actually the same as the one I talked about before, but with some additional details.

First, the real-time user behavior is collected through the message queue. The real-time user behavior in the message queue will be imported into an offline store to store the historical user behavior. Then the static feature calculation will be done every day, and finally put into the feature store for the online reasoning service.

At the same time, the system will also do offline sample stitching, which will be stored in the sample storage for offline model training. The offline model training will generate new models for verification every day and then give them to the reasoning service for use. This model is a T+1 update.

This is the architecture of a classic offline recommendation system. To push it into the real-time recommendation system, the following three things should be done:

  • Feature calculation Static T+1 feature calculation to real-time feature calculation.
  • Sample generation offline T+1 sample generation to real-time sample generation.
  • Model training offline training T+1 update to incremental training real-time update.

(2) Alibaba search promotion online machine learning process

Alibaba search promotion has launched such a real-time recommendation system, its entire process is actually similar to the offline recommendation system, the main difference is that the whole process is real-time.

As shown above, this system has three main features: timeliness: during the promotion period, the whole process is updated in real time. Flexibility: Adapt characteristics and models as needed. Reliability: System stability, high availability, on-line effect guarantee. Users can update models and features in a very timely manner. During the promotion period, they can adjust features and models at any time, and the results are also very good.

(3) Real-time recommendation system architecture

What should a real-time propulsion system architecture look like?

As the figure above shows, the real-time recommendation architecture has undergone some changes compared to the classic offline recommendation system. First of all, the data generated by the message queue is not only stored in offline storage for historical behavior, but also read out two copies of the message in the message queue. One copy is used for real-time feature calculation, which is also put into the feature store, and the other one is put into real-time sample stitching. A two-stream Join with the user characteristics used by the online inference service enables a real-time sample.

In this case, the samples stored in the real-time system can be used for both offline model training and real-time model training.

Whether it is offline or real-time model training, the generated model will be put into the model store, and the model will be verified and finally go online.

Offline model training is days, but real-time model training can be minutes, hours, or even seconds. At this time, the offline Model training will generate a Base Model day to the real-time Model training, and then do incremental Model update.

It is important to mention that the inference service uses the characteristics taken from the feature store to do the reasoning. At the same time, it also needs to send the characteristics used for the reasoning along with the Request ID to the message queue. So real-time sample together, when generating a positive samples, for example shows the user a advertisement, and then click on it is a positive samples, after this time will be able to know what features to the user when using the recommended ads, so the feature information should be retained, reasoning service to real-time sample do samples stitching inside, To produce a very good sample.

In this architecture, you can see that compared to the classic offline recommendation system, the parts in the green box are all real-time parts, some parts are new additions, and some parts are the original offline parts into real-time parts. For example, real-time feature calculation is a new addition, real-time sample stitching is a part of the original offline sample stitching into real-time, real-time model training is a new addition, and the same is true of model verification, which is the original offline model verification into real-time model verification.

(4) Real-time recommendation scheme based on Flink + Hologres

What kind of systems would be used to implement the real-time recommendation architecture?

As shown in the figure above, Kafka is used for message queues and HDFS is assumed to be used for offline storage. Both real-time and offline feature calculations can now be performed using Flink, and the ability of Flink stream batching ensures that the results of real-time and offline feature calculations are consistent. The function of Hologres here is feature storage. The advantage of Hologres feature storage is that it can provide very efficient point search. Another is that when real-time feature calculation is done, some inaccurate features are often generated, which need to be corrected in the later stage. A good feature correction can be made using the Flink plus Hologres mechanism.

Similarly, on the reasoning service side, Kafka will be used for message queues by keeping the reasoning features in the sample splicing. Sample splicing this thing will use Flink to do, Flink a very classic application scenario to do dual-stream Join. After the sample is pieced together, the features are added, and then the calculated sample is also put into Hologres for sample storage.

In the case of sample storage, the samples in Hologres can be used for real-time model training by reading the Binlog of Hologres, and offline model training can also be done through batch Scan of Hologres.

Both online and offline model training can be done using Either Flink or FlinkML, or Alink. In the case of traditional machine learning, TensorFlow can also be used for deep learning model training. Such model may be stored in HDFS, and then verified by Flink and TensorFlow. Finally, online reasoning services can be done.

Many users will have their own reasoning engine, if available, if they want to use Flink and TensorFlow, you can use it directly.

(5) Real-time feature calculation and Reasoning (Flink + Hologres)

First let’s look at the process of real-time feature calculation and inference, as shown in the figure above.

As mentioned earlier, we will collect real-time user behavior, send it to Flink to do real-time feature calculation, and then save it in Hologres for online reasoning service.

Real-time features here might include:

  • User’s browsing history in the last 5 minutes: 1) products, articles and videos; 2) Duration of stay; 3) Collection, purchase, consultation and comment
  • Top 50 hits per category in the last 10 minutes
  • The most viewed article/video/product in the last 30 minutes
  • Top 100 most searched words in last 30 minutes

For search promotion business, can use such real-time features to better obtain the recommendation effect.

(VI) Real-time sample splicing (Flink + Hologres)

Next, we’ll look at the real-time sample splicing, as shown below.

Real-time user behavior is collected and put into Flink for sample stitching. Sample joining together here includes two parts, the first part is the first to know the sample is positive samples and negative samples, it is through the analysis of the log of real-time user behavior, we will have a display of flow, click stream, if show stream Join click stream, and then find a Item is a user clicks on the show, so this is positive samples. If we show an Item and the user doesn’t click on it, then it’s a negative sample, and that’s how we determine whether it’s positive or negative.

The judgment of positive and negative samples alone is obviously not enough, because this feature is also needed in the training. These features come from the reasoning service. When an Item is displayed, the reasoning service uses some features to judge whether the user will be interested in this Item. These features will survive in Kafka and will survive in Flink. In the process of sample splicing, these features will be used to make recommendations through the Request ID Join, and then a complete sample will be generated and put into Hologres.

Here, Flink’s multi-stream Join capability will be used for sample splicing, and at the same time, multi-stream synchronization, positive and negative samples and sample correction will also be done.

(VII) Real-time model training/Deep Learning (PAI-Alink/Tensorflow)

After the sample is generated, the next step is real-time model training or deep learning.

As shown in the figure above, in this case, the samples mentioned above are stored in Hologres. The samples in Hologres can be used for two purposes, both for online model training and offline model training.

Online model training and offline model training can be done using Hologres’ Binlog and batch Scan functions respectively. In terms of performance, it’s not that different from a typical message queue or file system scan.

If this is a deep model, you can use TensorFlow to do the training. For traditional machine learning models, we can use Alink or FlinkML to do the training, and then go into HDFS storage, store the model, and then verify the model through Flink or TensorFlow.

The above processes are some of the techniques that can be used to build real time models and deep model training.

(8) Alink — Flink ML (Machine Learning algorithm based on Flink)

Here is a brief introduction to Alink. Alink is a machine learning algorithm library based on Flink. It is currently open source and is being contributed to the Apache Flink community.

As shown above, Alink (Flink ML) has two features compared to Spark ML:

  1. Spark ML provides only the batch algorithm, and Alink provides the batch and stream integrated algorithm.
  2. Alink is comparable to Spark ML in its batch algorithm.

(9) Off-line feature Backfill

After the training section, let’s look at the off-line feature backfill. What this process actually means is that after the real-time features are online, new features need to be online. What should we do?

As shown in the figure above, there are usually two steps. The first step is to add new features to the real-time system, so that at some point in time, all the features stored in Hologres will have new features. What about the historical data? In this case, you need to redo a feature backfill, run a batch task with the historical behavior data stored in HDFS, and then make up for some historical features.

Therefore, the backfilling of offline features in the architecture diagram is also completed by Flink’s offline feature calculation. The historical behavior data is read out from HDFS, and then some offline features are calculated to complement the features in the past history messages.

Key technologies of real-time recommendation system based on Apache Flink + Hologres

There are a lot of key technologies used in the framework just now, so I will focus on two points.

(1) The revised features and samples may be withdrawn

The first point is a retractable revised feature and sample, as shown in the figure above.

In the area with lower shadows in the figure, some samples and features will be withdrawn and revised through the cooperation of Flink and Hologres. Why are features and sample revisions needed?

  • The real-time logs are out of order. For example, a user click event arrives late due to the system delay, and False Negative samples are generated.
  • Generally, the off-line work is used to recalculate the off-line samples and recalculate the whole off-line samples
  • The Apache Flink + Hologres retraction mechanism point update updates only the features and samples that need to be corrected

The real-time log may be somewhat out of order, with some streams arriving early and others arriving late. In this case, some False Negative samples may be generated due to the delay and late arrival of the system when making multi-stream Join.

For example, when doing a presentation and click stream Join, you might initially think that the user did not click on an AD, and then find out that the user did, but the event arrived late. In this case, the downstream user is initially told that the user did not click, this is a False Negative, and then it turns out that the user actually clicked, so the False Negative needs to be corrected. When that happens, you have to do a retraction or an update of the previous sample, to tell it that the previous sample was not a negative sample, but a positive sample.

In this case, we need to have a retraction capability on the whole link. We need to step by step tell the downstream of the previous error, and we need to fix it. This mechanism can be completed with Apache Flink + Hologres.

Why do such a thing?

In the past, when such False Negative samples were generated, the offline samples were generally recalculated through offline work for correction. The cost of this method is that it may need to re-run the entire offline sample calculation, but the ultimate goal is to correct only a small part of all samples, so the cost is relatively high.

Through the mechanism implemented by Apache Flink + Hologres, the False Negative sample can be updated in dot form, instead of running the whole sample again. In this case, the cost of correcting characteristics and samples will be much smaller.

(2) Event-based flow-batch hybrid workflow

Another key technology in this architecture is event-based flow-batch hybrid workflows. What does that mean?

Looking at the diagram, this is a very complex workflow in addition to the systems just shown. Because between different systems, it may have dependencies and scheduling relationships, sometimes data dependencies, sometimes control dependencies.

For example, we may run some off-line static feature calculations periodically or regularly, either to backfill the features or to correct problems caused by real-time features, either by default or manually triggered. Other times, it is necessary to trigger the action of online model verification after offline model training is generated, or it is possible to trigger the action of online model training after online model training is generated.

It is also possible that the sample is spliced at a certain point. For example, after sample splicing is completed at 10 a.m., they want to tell the model training that all the samples before 10 a.m., and they want to run a batch offline training task and do offline model training on the data from 10 a.m. yesterday to 10 a.m. today. Here it is the process of triggering a batch task from a flow task. After the generation of batch model training just mentioned, it needs to be put into the process of online model verification. In fact, it is a process in which batch tasks trigger flow tasks, and the model generated by online model training needs to be verified by online model training. This is the process in which flow tasks trigger flow tasks.

Therefore, in this process, there will be a lot of interaction between different tasks, which is called a more complex workflow, it has both batch tasks and flow tasks, so it is a flow batch mixed workflow.

3. Flink AI Flow

How to achieve flow batch mixed workflow implementation?

Using Flink AI Flow, which is a big data plus AI top-level workflow abstraction.

As shown in the figure above, a Workflow can usually be divided into Workflow definition and Workflow execution.

The Workflow definition defines Node and Relation, which define nodes and relationships between nodes. In Flink AI Flow, we define a node as a Logical Processing Unit, and then define the relationship between these nodes as Event driven conditions. Under this abstraction, an event-based schedule is made at the Workflow execution level.

Strictly abstract, in a system there will be a lot of events, these events combined together, may satisfy some conditions, when a condition is met, will produce some actions.

For example, A workflow might have A task A that listens for various events in the system. When event 1 occurs, then event 2 occurs, then event 3 occurs, when events occur in such A sequence, the action to start task A needs to be done, event 123 occurs in order is the condition.

With such abstractions, it is possible to integrate traditional workflows with workflow with flow jobs. Because in the past, the traditional workflow is based on job state changes to schedule, usually the job run out, and then see how to run the next job. The problem with this approach is that if the job is a flow job, the job will never finish and the workflow will not work properly.

In the event – based scheduling, a good solution to this problem. Workflow scheduling is no longer dependent on job state changes, but rather event-based. That way even a stream job can generate some events and then tell the scheduler to do something else.

In order to complete the entire scheduling semantics, some support services are required. The support services that assist in completing the entire scheduling semantics include:

  • Metadata Service
  • Notification Service
  • Model Center

Here’s a look at each of these support services.

(4) Metadata Service

Metadata service is to manage the data set, in the workflow hope that users do not have to be very tedious to find their own data set, can help users manage the data set, when the user wants to use a name can be.

The metadata service also manages projects, which refers to projects in Flink AI Flow. A Project can contain multiple workflows. The main purpose of managing a Project is to ensure that the workflows can be reproduced.

Within the metadata service, workflows and jobs are also managed, and many jobs may be involved in each workflow. In addition, it manages model lineage, which allows you to know which version of the model was generated by which job in which workflow, and finally allows you to define some custom entities.

(5) Notification Service

The second service is the notification service, which is an event and event listener with a primary key.

For example, look at the picture above. A client wants to listen for an event whose Key is the model. If the Key is updated, the listening user will receive a call back telling him that an event was updated, and that the event’s primary Key is the model, the Value is the URI of the model, and the version number is 1.

One useful function here is that if a job is validated, it can listen to the Notification Service. When a new model is generated, it needs to be notified and validated, so you can do this with the Notification Service.

(6) Model Center

The model center is responsible for the management of multiple versions of models, the recording of parameters, including the tracking of model indicators and the management of model life cycle, as well as some model visualization work.

Take an example to illustrate how Flink AI Flow describes the complex workflow in real-time recommendation system with a complete workflow.

As shown above, suppose there is a DAG, which contains three tasks: model training, model validation, and online reasoning.

First of all, after the jobs trained by the Scheduler model are submitted, the Scheduler will update the status of the jobs in the Metadata Service to become a state to be submitted. Assuming the environment is K8S Cluster, it will commit to Kubernetes to run such a training job.

Once the training job is running, the status of the job can be updated via the job status listener. Suppose the job is a streaming training job that runs over a period of time and generates a model that is registered in the model center. Once the registration is complete, the model center issues an event indicating that a new model version has been registered and this event goes to the Scheduler, who listens for the event.

The Scheduler then looks to see if any conditions have been met when the event is received, and what action needs to be taken. When a model is generated, the Scheduler needs to validate the model. When this condition is met, it needs to pull up a job, which is a model validation job.

After the model validation job is pulled up, it goes to the model center to find the latest model version that was generated, and then performs model validation on it. Assuming that the Model validation is passed and the Model validation is a batch job, it will tell The Model Center that the Model is Validated. In this case, the Model Center will send a Model Validated Version Event to the Scheduler. After the Model is updated, The Scheduler looks at the Model Validated and triggers the inference service on the pull line. After the inference service is pulled up, it will go to the model center and pull the model that has just been Validated for inference.

Suppose the inference service is also a flow job and is always running there. After a period of time, the training job of the online flow generates a New Model. The path just passed will be Validated, and a New Model Version generated by a Model will be Validated, and it will be heard by the Scheduler. Scheduler will pull a Validated job, and Job2 will be pulled again. After being Validated, the Validated job will verify the Model. It is possible that after the Model verification passes, a New Model Version Validated will be sent to the Model center. The model center gives this Event back to the Scheduler. At this point, the Scheduler will see that the reasoning job is already there and may do nothing.

The inference job is also listening for Model Version Validated events. When it receives this event, one of the things it does is reload the latest Validated event into the Model center.

This example explains why a flow-batch hybrid scheduler and workflow is needed to concatenate all the jobs and workflows in an end-to-end real-time recommender architecture.

The original link

This article is ali Cloud original content, shall not be reproduced without permission.