An overview of the

Snowball Toutiao uses a typical Feeds stream architecture to distribute user content based on the stock exchange community scenario. In the face of frequent changes in the stock market, it is necessary to efficiently realize the connection between users and content, users and stocks, users and users, and complete the timely and effective distribution of content. Snowball algorithm team developed an intelligent content distribution system based on algorithm. Tensorflow framework was adopted in Rank stage, and Wide&Deep model, which is the forefront of the industry, was used to complete the content sorting. In the face of such complex and computationally intensive business, it is necessary to continuously optimize and improve engineering efficiency.

The recommendation system team optimized the model training process, shortened the training time and improved the model iteration speed. At the same time, based on the framework of Tensorflow, an automated model system can automatically complete data preparation, model training and update online, reducing the intervention of human factors and improving work efficiency.

First, business challenges

Snowball Toutiao is a typical personalized recommendation system, relying on large-scale data warehousing and machine learning technology support. The snowball architecture is shown in the figure below, including the following three layers:

  • Online layer: it is responsible for the processing of users’ online requests, which is initiated by the recommendation service, calls the recall service in turn, and completes the content recommendation by ordering services. Post service and index building service serve as the auxiliary system to complete the related functions.

  • Offline layer: consists of two parts:

  • It mainly deals with log related data for business analysis, model training and user portrait construction. The training part of the model here is inefficient and becomes a business bottleneck, which is the focus of this chapter.

  • The processing of the relevant part of the post relies on NLP technology to complete the calculation of post classification, Tag extraction and so on.

  • Basic data layer: basic data such as logs and user posts in front and back.

The model training part of offline service layer is the difficulty of recommendation system. It is well known that the Wide&Deep model we used was relatively complex and required intensive data calculation, which posed a considerable challenge to the engineering team. At the same time, due to the unpredictable stock market, users’ interests also change frequently, so the ranking model needs to be updated and adjusted constantly. Here how to build a real-time model system, automatic model training and update, is also a problem in front of us. The business challenges of Snowball recommendation system are as follows:

  • Offline training currently generates about 5T of data per day, with training data reaching 1 billion samples

  • The on-line throughput peak reaches 400QPS and the delay P99 is less than 600ms

  • As the business boomed, the volume of data and computing continued to soar

Therefore, it is urgent to improve model training speed, shorten training time, improve model iteration speed and speed up model distribution efficiency. Against this background, the recommendation system team tried the following optimizations.

Ii. Technology selection: GPU accelerated training


Gpus can perform matrix and floating point calculations faster than CPUS. Mainstream Gpus are 30 to 100 times faster than cpus. In training, the division of labor between CPU and GPU is as follows: CPU completes data preparation, such as data loading, feature extraction and data fragmentation, while GPU completes model calculation.

GPU programming requires nvDIa-related drivers to be pre-installed and the GPU version of Tensorflow installed. When the environment is in place, you can write GPU programs. Writing GPU code for Tensorflow is relatively simple, requiring only device names for parameters (var) and operators (op).

On top of this, Tensorflow officially customizes the commonly used Estimator and provides a default poll-based device allocator *. The user only needs to specify the Session configuration to apply the GPU. If users want to define Estimator according to their own preferences, it is very convenient to write a custom Estimator to freely assign parameters and operators.

1.GPU model was preliminarily tested and optimized

When the first version of the GPU model went live, it was far from what was expected. Mainly reflected in the following points:

(1) The training speed is too slow, only 6000 samples/second. However, CPU only training can be 8000 samples/SEC.

(2) CPU and GPU usage is low. On a 58-core machine, the CPU usage is less than 20%, and the GPU usage is only 0-8%, which shows obvious peaks and troughs.

In order to solve the problem of training speed and make full use of GPU, we tried various optimizations. Finally, it has been improved to 2.1W samples/second, which meets the current training requirements. The specific optimization methods and ideas are as follows.

First, we analyze the method of model training after the introduction of GPU. As mentioned earlier, the CPU is responsible for data input and transformation, and the GPU is responsible for model calculation. The calculation of a STEP is divided into three stages:

  • I/O: Raw data is read into memory from hard disks or other persistent media by system IO

  • Map: Feature extraction is carried out through transformation (MAP) process, and data is scattered and converted into feature data for Tensorflow. Batch and shuffle are also included. These operations are computationally intensive and can only be performed on cpus.

  • Train: Finally, input to GPU memory for model calculation to complete the training process. Due to the extremely fast GPU computing speed, the bottleneck lies in the interaction between CPU and GPU, including scheduling and bidirectional memory copy.

This can be optimized from the two aspects of data preparation and training. Preparing the data is the hard part, and I’ll talk about it later.

2. Optimization of training

The training part depends on the code of the model, mainly the definition of parameters and operators in Estimator. In general, there is a rule to follow: try to put complex floating-point calculations (such as matrix calculations) on GPU operations, and define parameters and corresponding operators on the same piece of device to avoid data copying between devices.

Since the Estimator of Wide&Deep officially provided by Tensorflow is used in the project, it has been fully optimized, and the optimization effect attempted here is not obvious.

3. Optimization of data preparation

3.1 the parallel

The first thing that comes to mind is parallelism. The CPU usage is low and the multi-core is not fully utilized. Therefore, prepare data using multiple threads. As shown below:

To make full use of all cpus, a set of data preparation thread pools is enabled. After processing by multiple threads, one batch of sample data is delivered to GPU for training. After parallel optimization, the training speed reaches 8000 samples /s, which is about 30% higher than that of pure CPU training.

However, even after parallelization, GPU utilization has not reached the ideal state. GPU usage always shows occasional jumps, always intermittently reaching 12% and then going back to zero. CPU usage also has the same intermittent phenomenon, presumably due to the problem of computing mode, training speed can be further improved.

3.2 Data Preparation

Then the above optimization, analysis and calculation mechanism, in a batch of computing processing, CPU is responsible for IO and feature extraction, prepare data for training, and then hand it to GPU for calculation. In this process, if the design is not correct, the CPU and GPU will be idle alternately. That is, the CPU prepares data, the GPU waits, the GPU trains, the CPU waits, and so on. The total training time will be the sum of CPU time and GPU time.

Therefore, another optimization point is proposed, namely, CPU prepreparation of data. That is, the CPU continuously prepares each batch data and the GPU computes in parallel. When the CPU prepares a batch of data, the GPU trains the CPU to prepare the next batch of data. (Note: The data copy speed from CPU to GPU is extremely fast, and the mainstream GPU can reach 20G/s. The time required here can be ignored.)

Through the optimization point of data prepreparation, the training speed was increased to 9000 samples/SEC, which is about 10% improvement.

The only optimization effect here is surprising. In this mode, the training stages are parallel, which is equivalent to reducing the training time, but the overall speed is not significantly improved. It indicates that the time of the training phase is very short (10%), and that the GPU speed is much higher than the CPU speed. The key bottleneck to improve the training speed is still in the data preparation part.

I also see that CPU usage continues to be high, but there are still intermittent spikes on gpus. Gpus are still underutilized and there is still room for improvement in computing models.

3.3 introduced TFRecord

Continue to analyze the problem of high CPU usage but underutilization of GPU. The training data in the project is in the native CSV format with Hive, reading data from CSV files to generate Tensorflow available data format. There are also some calculation of feature extraction in the middle, and for the repeated configuration of samples in the project, there is the phenomenon of double calculation in feature extraction. Therefore, the native format TFRecord of Tensorflow is introduced here, which is a data file that stores extracted features in key and value formats. Before training, data is converted into TFRecord format and persisted to hard disk. In the later training process, TFRecord files were directly read from the hard disk to reduce the complex feature extraction calculation.

The introduction of TFRecord format can indeed reduce the CPU burden, increase the throughput of data preparation stage, and improve the utilization rate of GPU to some extent. The training speed was increased to 10,000 samples per second, about a 10% improvement. However, the effect is not as expected, CPU usage has decreased, but the biggest problem is that the usage of hard disk has increased to 90%. Here’s another IO problem waiting to be solved.

3.4 Compression of TFRecord

Because TFRecord is characteristic data stored in (key, value) format, the file is relatively large. In the project, the size of 100W pieces of data is about 4G. For a 100 mbit /s mechanical disk, a maximum of 2.5 W pieces of data can be processed per second. Data compression is the easiest thing to think about when system IO problems occur. Here our compression approach starts from two aspects.

3.4.1 Compression of data formats

The default storage structure of TFRecord can be shown in the figure below, with one sample in each row. The sample includes features 1-N, and each feature is stored by key and value. For example, sample 1 is stored as list(KN, V (n,1)). In this storage mode, the key values (K1-KN) are stored repeatedly for each sample. Since most of the features in the project are single-valued float or int classification features, especially when the key values are long, these key values occupy more than 60% of the space.

Optimization: In order to keep the key readable, the length of the key is not compressed. Instead, it adopts the scheme of multiple data merging storage. The project will store m samples in units of M. As shown in the figure below. For sample 1-M, the merged storage is stored in a row of TFRecord in the form of List (KN, list(v(n,m)).

The value of m is the exact divisible number of the batCH_size hyperparameter during training, which facilitates data consolidation in the subsequent data preparation stage. At the same time, M is a compromise between training speed and space. The larger M is, the better space compression effect is, but it will affect the parallelism of data preparation stage and increase the time of MAP stage. The smaller m is, the better the parallelism is, and the time of map stage is reduced (the influence of scheduling is not considered here, see the discussion of the next optimization point), but the space compression effect is worse. In general, for projects where batch_size=1000, m of 100 is a good compromise.

Using this optimization method, the data size was reduced by 56% and the pressure on the hard drive was released. At the same time, multiple copies are merged and data is stored in m units, which just meets the needs of the next optimization point. We will explain this in the next section.

3.4.2 General compression

General compression will use GZip and other compression methods for data compression, compression ratio is relatively obvious, can obtain an additional 60% of the compression effect.

By introducing compressed TFRecord, the problem of high CPU usage and low GPU usage is completely solved. Based on this optimization, the GPU utilization rate can be increased to about 20%, and the training speed of samples can be increased to 1.6W samples/second. The disk usage falls below 15%.

3.5 Batch before Map

When trying various optimization methods, one optimization point is very obvious: batch first and then map.

First, take a look at the traditional approach. After the original data is read in, map transformation is performed one by one to complete the data transformation, and then batch operation is performed to form a batch of training samples with batCH_size as the unit. ** Optimization: We divided the batch into two stages, batCH1 and BATch2, respectively before and after the map. ** In the batch1 operation, the original data is batch integrated according to batch_size1 before map, and then sent to map for data transformation (the map function here needs to be modified to fit the batch data). Finally, the batch2 operation integrates the data again. The selection formula of batch_size is as follows:

batch_size = batch_size1 * batch_size2

This process optimization will increase the training speed of samples to 2W samples/second.

The reason for such an obvious effect is that after TFRecord is adopted, the map operation is only the deserialization of training data. The map operation is too light, so the scheduling loss of the thread pool cannot be ignored: most of the CPU loss is in the waiting and scheduling of the map thread pool. Therefore, batch1 operations are added to deliver batch data to the MAP thread to complete, increasing the operation thickness of map, reducing the loss of computing resources caused by thread scheduling, and ultimately improving the throughput in the data preparation stage.

In accordance with TFRecord’s multiple copy merging storage schemes, the stored TFRecord is batch sample data (the TFRecord compression ratio M above is batch_size1 in this section). Therefore, the BATch1 process is omitted and data can be directly read for map.

3.6 Optimization Summary

Summarize the optimization points of GPU training:

Optimal point

Training speed (sample/second)

GPU usage

note

There is no

6000

7-8%

Multiple threads prepare data in parallel

8000

12%

Data Preparation

9000

12%

The introduction of TFRecord

1.6 w.

20%

Compression TFRecord

After the first batch of the map

2.1 w.

30%

The number of Wide&Deep layers in the project is small, and GPU cannot be fully utilized

3.7 Project Usage

Currently, the project provides a single 4-card GPU machine, which can train 4 models in parallel on a single machine with a speed of 8W samples/second. In the training stage, the speed is improved, and the daily training task can be completed within 5 hours, which meets the demand of updating the model every day.

3. System practice: Model_bus model training & prediction system


After improving the training speed and meeting the needs of updating models every day, the Snowball algorithm team developed a set of automatic model training & updating & prediction system — Model_bus. Centering on the current model training process, the model has been integrated with feature transformation, model training, model on-line and other links, and equipped with WEB front-end service, abnormal alarm and other auxiliary functions, forming a complete model system for production environment.


The Model_bus system is divided into three main modules:

(1) Feature transformation service

The feature conversion service is triggered by a scheduled task, downloads the original training data from HDFS, completes feature extraction of the data, generates compressed TFRecord, and uploads it to HDFS. Since there is no dependence between training data, the feature transformation service can be distributed to shorten the transformation time. For day-level data, it takes 1-2 hours.

After the feature transformation service has uploaded HDFS, it sends a message to MQ to inform the model training service to start training.

(2) Model training service

The model training service is triggered by the message of feature transformation service, pulls the TFRecord training data, and successively completes model training, model evaluation, AUC calculation and model uploading. For day-level data, it takes less than 5 hours. If the model evaluation does not meet the online requirements, the subsequent flow will be stopped and an exception alarm will be triggered.

When a model that meets the online requirements is uploaded to the HDFS, the model training service sends a message to the message queue to notify the model online service.

(3) Model on-line service

The model online service is a set of model loading, detection and online services built based on Tensorflow-Serving. Triggered by the message of model training service, the model of corresponding version is pulled from HDFS to automatically complete the model update and online. If the model fails to be loaded, the online service tries again. If the model still fails, an alarm is generated to notify manual intervention.

Key logs of Model_bus are recorded in Mysql. Users can view model status, related indicators, and online status through the WEB front-end interface.

Summary and Prospect


In the scenario of personalized recommendation of Snowball Headlines Feeds stream, snowball algorithm team adopted Wide&Deep model to complete content sorting, and completed a series of optimization through GPU empowerment, which greatly improved the model training speed. On this basis, a model on-line system Model_bus is built, which automatically completes the model training/update/online service function, and provides web operation, abnormal alarm and other related auxiliary modules.


In the future, the Snowball algorithm team will focus on building a more real-time recommendation system, building a real-time data warehouse and promoting the application of online learning. In terms of model training, the distributed training based on Tensorflow is developed to further improve computing capacity.

recruitment

As the business continues to grow, snowball recommendation team still has many business needs and technical challenges. If you have experience in machine learning, recommendation systems, algorithms or background development, we hope you can join us. To provide shareholders with a more intelligent and effective content distribution system.

Position: Recommendation system Engineer/recommendation algorithm engineer

Email address: [email protected]