preface

What is personalized recommendation? Simply put, it is to recommend to the user what he likes. In the past 10 years, with the rapid development of mobile Internet, personalized recommendation has played a very important role. Take the operation of a content product as an example: the user growth team attracts new products and increases DAU through advertising and other means; Product technology teams distribute content of interest to users to improve retention and retention; The commercialization team distributes ads that may be of interest to users to improve unit traffic realization efficiency; Revenue from commercialization is used for user growth, forming a positive cycle. Personalized recommendation technology runs through every link and has become a high-speed growth engine for many companies.

How to make personalized recommendation? Generally, for a business, multiple optimization goals (such as video playing duration, likes and shares, e-commerce clicks, purchases and purchases, etc.) will be defined first, then one or more models will be built to estimate these goals, and finally the estimated scores of multiple goals will be integrated to complete the ranking. For the recommendation system, the core work is to build an accurate prediction model. Over the years, recommendation models in the industry have been evolving toward large-scale, real-time, and refined models. Large-scale refers to the large amount of data and models, with training samples reaching tens of billions or even trillions, and a single model reaching TB or even more than 10TB. Real-time is the real-time update of features, models and candidates; Refinement is reflected in feature engineering, model structure, optimization methods and other aspects, and various innovative ideas emerge endlessly.

The implementation of large-scale recommendation system is a big engineering challenge. In this article, we select the Training and Serving systems that you are most interested in, and describe the challenges we encountered and what we did during the build process. It is not easy for any company to build such a system from scratch, and the investment is very large. Within Bytedance, we have been exploring and developing for many years. We have thousands of engineers, constantly iterating and optimizing the recommendation system. Then, what are the general problems in building a recommendation system? Let’s start with a story:

The story of Company A

A is an e-commerce company. Their product has 3 million DAUs and they have an algorithm team of 10 people. In the process of building the recommendation system, they have encountered many troubles.

Company A wants to train A click-through model, 100 million views A day, 1 million clicks A day, and they want to train the model with 3 months of data and A sample size of 9 billion. They designed 200 features, including user ID, commodity ID, user click sequence, etc., and wanted to assign 16-dimensional vectors to each feature to represent it. After rough calculation, the model size was 500G. After analysis, they found that they needed to do distributed training and model storage, so they investigated some open source solutions:

  • Tensorflow: Google open source machine learning system that can use Partitioned Variable to store Embedding distributed to achieve large-scale training. However, since the table size is fixed, there is a hash conflict risk.
  • PyTorch: Facebook’s open source machine learning system uses Ring All Reduce to synchronize parameters. It requires a single machine that can hold All parameters, making it difficult to train large models.
  • XDL: a domestic open source machine learning system, developed by the PS system, using TF as the training engine, and built-in some out-of-the-box recommendation models. Functionally, large-scale training can be realized, but the open source support of this system is weak, and it is risky to use in production.
  • Angel: A domestic open source machine learning system, which is closely combined with Spark, a big data system, and uses Spark to complete data preprocessing and feature engineering. Self-developed Parameter Server, embedded with Pytorch as training engine, can train super-large models. However, the online and offline characteristics of Angel are difficult to ensure consistency, so it is only suitable for offline training platform.

After comparison, Company A chooses Tensorflow for distributed training. However, when training the model, we found that the speed was very slow. Even though we invested a lot of resources, it still took 5 days to train the data of 3 months. They spent a lot of time looking at Tensorflow, the profiling training process, and found some problems:

  • The distributed runtime performance of TensorFlow is poor. For each feature, a pair of send/ RECV op is independently generated to connect worker and PS, so that a single worker and PS generate 200 send/ RECV. As a result, the scheduling of TensorFlow Runtime is difficult and the speed of distributed training is reduced.
  • The CPU usage was very erratic during training, and it seemed that the CPU was not being fully utilized.
  • Some operators are very slow and may be associated with memory bandwidth.
  • Although the network bandwidth is not full, adding more machines will not speed up training any more.
  • Browsing the official website of TF, I found that TF recently launched a variety of different distributed strategies, which correspond to different topologies of training clusters. They are very confused and do not know which one to choose.

While a number of performance issues were found, optimization was not easy. After some time, they optimized some of the problems and reduced the training time from five days to three, which was barely acceptable. However, at the 40th hour of the training, the task failed because one of the machines got OOM. They tried several more times and found that the training success rate was relatively low. After analysis, the main reasons were found as follows:

  • TF builds cluster based on static topology configuration and does not support dynamic networking. This means that after a PS or worker hangs up and restarts, if IP or port changes (such as machine crash), training cannot continue.
  • Checkpoint of TF only contains the parameter information stored by PS, but does not contain the status of worker terminal. It is not a globally consistent checkpoint and cannot realize the Exactly-Once semantics.

The challenge is to isolate a single cluster and make the training as stable as possible. Can not be mixed with other tasks scheduling, resource utilization is naturally lower.

After a few twists and turns, they managed to train a 500G mockup and wanted to push the mockup to the Serving line, so they considered the design of the online system. After some discussion, it was decided that Serving must meet the following requirements:

  • Distributed: The recommendation model is characterized by a large amount of Embedding, the model can easily reach TB level, and distributed Serving must be supported for future model iterations.
  • Low delay: The single estimated delay should be as low as possible, and the precision model should generally be controlled within 80ms. Complex depth models may need gpus to serve and do a series of performance optimizations.
  • High availability: The failure of a small number of nodes does not affect online stability. In general, multiple copies are used to solve the problem, requiring the support of the scheduling system.
  • Less jitter: model update, online, offline and other operations will not cause delay jitter.
  • AB test: the recommendation system iterates quickly, algorithm engineers will carry out many AB experiments, and the flow of the experimental group will be dynamically adjusted. The online system needs to be able to support the dynamic scheduling of models and services.

At present, no open source system can meet the above requirements, companies are developing their own, the actual cost is not small. Company A has limited manpower and experience, so we can only do some model compression to make standalone Serving. The model should not be too complicated.

After the model went online, Company A encountered A new problem: how to update the model. Regular full retraining is expensive, especially if there are multiple simultaneous ABTest models online. So, at the very least, do incremental updates in the sky, preferably in real time. Incremental/real-time updates, however, are also not easy to implement. In fact, there are more problems waiting for Company A in the future, such as: how to ensure the consistency of online and offline features; How to deal with the instability of upstream data flow; How to solve the growing problem of the model; How to do the mixed training of multi-scene data; How to deal with large-scale candidates; How to solve the problem of large latency of transformation events etc.

Our work

Through the story of COMPANY A, we can see that the difficulty and cost of developing A large-scale recommendation system is indeed not small. So, is there A product that can directly cover the whole process of data verification, feature engineering, model development, online service, AB test and so on, so that businesses can easily build A first-class recommendation system and no longer encounter the headache problems of COMPANY A? There is.

Since bytedance started Volcano Engine, we’ve been working hard to open up byte recommendation technology to outside customers. Now, we can help you solve these difficulties and pain points through the intelligent recommendation platform of Volcano Engine. At present, this platform also opens part of the quota for enterprises to use for free, specific information can be found at the end of the article.

The massive Training and Serving solution for the Smart Recommendation platform, Monolith, is what we call Monolith, and we hope it will become a solid foundation for your recommendation systems. Here’s the architecture:

Monolith is a PS architecture, and here’s how it works:

Batch/incremental training

  • When Worker/PS starts, it registers with ZK with information including (server_type, index). Then Worker requests registration information from ZK to generate Cluster information to realize dynamic networking, which is the basis of fault tolerance.
  • After the training starts, the Worker will get data from standard input or file, and at the same time pull parameters from PS, and then compute forward/ BACKWARD to get the gradient and Push it to PS.
  • After PS gets the gradient, on the one hand, it updates the internal weight using the optimizer, and on the other hand, it records what data is updated. A TF Session is set up on PS, which will periodically send updated parameters to Online PS, so as to realize real-time incremental update. In addition, feature filtering and feature elimination are also carried out on PS.
  • Checkpoint is written during or at the end of a training session. To speed checkpoint execution, Monolith does not use TF saveable, but instead uses estimator saving Listeners to stream and multithreaded access, which improves performance. To reduce checkpoint volume, expiration features are eliminated.

Online reasoning

  • Load saved_model. Entry is essentially TF Serving, which loads the non-embedding part from HDFS and registers with ZK for load balancing at the upper level. Online PS will also register with ZK first, then load parameters from HDFS, and remove optimizer auxiliary parameters during the loading process, convert FP32 to FP16, quantization compression, etc.
  • For a request, Entry randomly selects a set of Online PS to obtain Embedding to complete the prediction. Entry/Online PS is multi-copy. As long as one copy exists, the service is available. Online PS is multi-shard and can be Serving oversized models. Multiple shards can be deployed on a single machine or Entry/OnlinePS can be mixed.
  • For some systems with high real-time performance for models, Training PS will directly communicate with Online PS through RPC, so as to shorten the time interval for samples to be fed back to the Online model to the level of minutes.
  • Training PS can communicate with Online PS and accept the parameter update of Training PS. Entry can automatically read update parameters from the HDFS to implement minute-level incremental parameter updates.

To sum up, the Monolith include Training/Serving/Parameter Sync, etc., is a set of complete system.

Compared to other systems in the industry, Monolith has successfully addressed several challenges, including the following:

Resolved TensorFlow PS communication bottleneck

In industrial-scale recommendation models, we often use hundreds or even thousands of features, each of which requires creating a hash table to store the features embeddings. Directly generating a hash table for each feature class and looking up hundreds of tables at the same time causes two problems:

  1. The connection between PS and Worker generates too many send/ RECV ops, which greatly affects the operating efficiency of distributed runtime.
  2. These OPS lead to too many model graph nodes, too large model graph, and too long training initialization time.

In view of the above problems, we have optimized the framework level: For hash tables with the same dim configuration and the same optimizer parameters, Monolith consolidates the hash tables at the Python API level to reduce the number of tables, and monolith further consolidates the communication op, dramatically reducing send/ RecV OPS. Fixed communication issues with native TensorFlow PS.

For asynchronous training, Monolith has also developed variable and embedding prefetch and gradient asynchronous update, which make more efficient use of bandwidth and CPU for most models, increasing training speed and optimizing resource utilization.

Omnidirectional fault tolerance

On the basis of service discovery, both Worker and PS can recover quickly when errors occur. For workers, Monolith does not directly communicate between Worker nodes, so the failure of one Worker does not affect other workers. At the same time, the worker will store the input progress. When the worker fails due to unexpected reasons, the input progress will not be lost. When the PS Shard node fails, partial recovery and full recovery modes are supported based on the nature of offline or online tasks. The accuracy and recovery speed are determined.

Distributed Serving

Monolith fills the gap in distributed Serving, providing reasoning services for TERabyte models. Supports multi-copy and high availability. During Training PS, the just-updated Embedding is synchronized to Serving PS at the minute level, achieving near-real-time parameter updates and improving the recommendation effect of the product.

Performance optimization

In addition to solving the TensorFlow PS communication bottleneck mentioned above, Monolith also optimizes the Parameter Server architecture, the underlying Hash Table design, network transport, multithreading acceleration, OP Fusion, instruction set acceleration, and more, and has achieved significant performance gains. Taking asynchronous training as an example, the entire training process is shown as follows:

  • Network communication optimization: Embedding prefetch and Gradients postpush asynchronously integrate network IO with forward/backward computation of graph, and support separation of control flow and data flow, compression transmission and other optimization.
  • Memory optimization: Memory usage during training/ Serving stage can be greatly reduced by supporting feature filtering, feature compression, and feature elimination.
  • Computational optimization: Hot Spot Code uses AVX instruction set optimization, time-consuming Op fine tuning, manual Op Fusion and other methods to accelerate forward/backward computation.
  • Other aspects: multi-threaded optimization, fine-grained lock design, ASYNCHRONOUS IO and computing, etc.

Monolith has proven its performance, stability, and stability through its recommendation platform for ecommerce, community, video, and more. In the future, we will continue to iterate at a high speed to optimize the user experience and platform functionality.

A gift

Thank you for seeing this. Bytedance’s smart recommendation platform is now available to corporate partners through volcano Engine. If your company wants to use recommendation algorithms to help its business grow, but is also struggling to build a recommendation system, try volcano Engine intelligent recommendation platform. For more information, click on portal:

It is worth mentioning that the smart recommendation platform currently opens 30 places for corporate partners to use for free until November 30, 2021. Please scan the qr code below to register as soon as possible.

Write in the last

Finally, I would like to introduce you. We are volcano Engine intelligent Recommendation team, which is committed to enabling enterprises around the world to have the top recommendation system. We welcome students who are interested in machine learning system, recommendation architecture and recommendation algorithm to join us. Our base is Beijing, Shenzhen, Hangzhou and Singapore. Please send your resume to [email protected] and email title: name – Working years – Volcano Engine Intelligent recommendation – Position direction.