The application of deep learning in CTR estimation is increasing and new models are emerging. Looking at F (X) Design in terms of CTR estimation problems — DNN has been thinking about how these things might be implemented in industry after sorting out the connections between the models. After several months of investigation, I found some existing problems:
* The implementation of open source is basically done by people in academia, there is still a big gap from industrial applications * model implementation calls a lot of low-level API, each version of the implementation is different, the code bloated and difficult to understand, high migration cost * standalone, in the industrial scene can not runCopy the code
In view of the existing problems to do some exploration, to find a set of feasible solutions, with the following characteristics:
* Dataset API is used to read data, supporting parallel and prefetch reading * F (x) is realized by Estimator model_FN, which is very convenient to migrate to other algorithms. Just rewrite the model_FN F (x) section * to support distributed and standalone multithreading training * to support export Model, and then provide online prediction using TensorFlow ServingCopy the code
According to industry stereotypes, a complete machine learning project should consist of five parts: a feature framework, a training framework, a service framework, an evaluation framework, and a monitoring framework. Only the first three are discussed here.
Characteristic frameworks — logs in, samples out
Experimental data set using CritEO, feature engineering reference: github.com/PaddlePaddl…
#1 Continuous features to remove outliers/normalization #2 Discrete features to remove low frequencies, and then unified coding (feature coding needs to be saved and used for online prediction)Copy the code
For large-scale discrete feature modeling, DNN is the advantage of CTR prediction. Most of the paper focuses on how ID class features are processed, and there is little discussion on how to process continuous features, which can be summarized in the following three ways:
- don't do embedding | 1 - concat [continuous emb_vec] do embedding fc - | 2 -- after discretization embedding | 3 -- similar to FM second order section, unified do embedding, < id, Val > Discrete feature val=1.0Copy the code
For the sake of simple and uniform model design, the third method is adopted, and interested readers can try the effect of the first two methods.
Training framework — samples in, Model out
At present, several algorithms of DeepFM/ WIDE_n_deep /NFM/AFM/FNN/PNN are implemented. TensorFlow Estimator and Datasets API is used to implement input_fn and model_fn:
03519 3:1 4:0.02567 7:0.03708 8:0.01705 9:0.06296 10:0.18185 11:0.02497 12:1 14:0.02565 15:0.03267 17:0.0247 18:0.03158 20:1 22:1 23:0.13169 24:0.02933 27:0.18159 31:0.0177 34:0.02888 38:1,51:63:1 132:1 164:1 236:1 def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False): print('Parsing', filenames) def decode_libsvm(line): columns = tf.string_split([line], ' ') labels = tf.string_to_number(columns.values[0], out_type=tf.float32) splits = tf.string_split(columns.values[1:], ':') id_vals = tf.reshape(splits.values,splits.dense_shape) feat_ids, feat_vals = tf.split(id_vals,num_or_size_splits=2,axis=1) feat_ids = tf.string_to_number(feat_ids, out_type=tf.int32) feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32) return {"feat_ids": feat_ids, "feat_vals": feat_vals}, labels # Extract lines from input files using the Dataset API, can pass one filename or filename list dataset = tf.data.TextLineDataset(filenames).map(decode_libsvm, num_parallel_calls=10).prefetch(500000) # multi-thread pre-process then prefetch # Randomizes input using a window of 256 elements (read into memory) if perform_shuffle: dataset = dataset.shuffle(buffer_size=256) # epochs from blending together. dataset = dataset.repeat(num_epochs) dataset = dataset.batch(batch_size) # Batch size to use iterator = dataset.make_one_shot_iterator() batch_features, batch_labels = iterator.get_next() return batch_features, batch_labelsCopy the code
def model_fn(features, labels, mode, params): """Bulid Model function f(x) for Estimator.""" #------hyperparameters---- field_size = params["field_size"] feature_size = params["feature_size"] embedding_size = params["embedding_size"] l2_reg = params["l2_reg"] learning_rate = params["learning_rate"] layers = map(int, params["deep_layers"].split(',')) dropout = map(float, params["dropout"].split(',')) #------bulid weights------ FM_B = tf.get_variable(name='fm_bias', shape=[1], Initializer =tf.constant_initializer(0.0)) FM_W = tf.get_variable(name=' FM_W ', shape=[Feature_size], initializer=tf.glorot_normal_initializer()) FM_V = tf.get_variable(name='fm_v', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer()) #------build feaure------- feat_ids = features['feat_ids'] feat_ids = tf.reshape(feat_ids,shape=[-1,field_size]) feat_vals = features['feat_vals'] feat_vals = tf.reshape(feat_vals,shape=[-1,field_size]) #------build f(x)------ with tf.variable_scope("First-order"): feat_wgts = tf.nn.embedding_lookup(FM_W, feat_ids) # None * F * 1 y_w = tf.reduce_sum(tf.multiply(feat_wgts, feat_vals),1) with tf.variable_scope("Second-order"): embeddings = tf.nn.embedding_lookup(FM_V, feat_ids) # None * F * K feat_vals = tf.reshape(feat_vals, shape=[-1, field_size, 1]) embeddings = tf.multiply(embeddings, feat_vals) #vij*xi sum_square = tf.square(tf.reduce_sum(embeddings,1)) square_sum = Tf. Reduce_sum (tf) square (embeddings), 1) y_v = 0.5 * tf reduce_sum (tf) subtract (sum_square, square_sum),1) # None * 1 with tf.variable_scope("Deep-part"): if FLAGS.batch_norm: if mode == tf.estimator.ModeKeys.TRAIN: train_phase = True else: train_phase = False deep_inputs = tf.reshape(embeddings,shape=[-1,field_size*embedding_size]) # None * (F*K) for i in range(len(layers)): #if FLAGS.batch_norm: # deep_inputs = batch_norm_layer(deep_inputs, train_phase=train_phase, scope_bn='bn_%d' %i) #normalizer_params.update({'scope': 'bn_%d' %i}) deep_inputs = tf.contrib.layers.fully_connected(inputs=deep_inputs, num_outputs=layers[i], \ #normalizer_fn=normalizer_fn, normalizer_params=normalizer_params, \ weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='mlp%d' % i) if FLAGS.batch_norm: deep_inputs = batch_norm_layer(deep_inputs, train_phase=train_phase, Scope_bn ='bn_%d' % I) # after RELU https://github.com/ducha-aiki/caffenet-benchmark/blob/master/batchnorm.md#bn----before-or-after-relu if mode == tf.estimator.ModeKeys.TRAIN: deep_inputs = tf.nn.dropout(deep_inputs, Keep_prob =dropout[I]) #Apply dropout after all BN layers and set dropout=0.8(drop_ratio=0.2) #deep_inputs = tf.layers.dropout(inputs=deep_inputs, rate=dropout[i], training=mode == tf.estimator.ModeKeys.TRAIN) y_deep = tf.contrib.layers.fully_connected(inputs=deep_inputs, num_outputs=1, activation_fn=tf.identity, \ weights_regularizer=tf.contrib.layers.l2_regularizer(l2_reg), scope='deep_out') y_d = tf.reshape(y_deep,shape=[-1]) with tf.variable_scope("DeepFM-out"): #y_bias = FM_B * tf.ones_like(labels, dtype=tf.float32) # None * 1 warning; Do not use label, otherwise predict/export will fail, train/evaluate will work. Preliminarily, the estimator has been optimized. Y_bias = FM_B * tf.ones_like(y_d, dtype=tf.float32) # None * 1 y = y_bias + y_w + y_v + y_d pred = tf.sigmoid(y) predictions={"prob": pred} export_outputs = {tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: tf.estimator.export.PredictOutput(predictions)} # Provide an estimator spec for `ModeKeys.PREDICT` if mode == tf.estimator.ModeKeys.PREDICT: return tf.estimator.EstimatorSpec(mode=mode,predictions=predictions,export_outputs=export_outputs) #------bulid loss------ loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y, labels=labels)) + \ l2_reg * tf.nn.l2_loss(FM_W) + l2_reg * tf.nn.l2_loss(FM_V) # Provide an estimator spec for `ModeKeys.EVAL` eval_metric_ops = { "auc": tf.metrics.auc(labels, pred) } if mode == tf.estimator.ModeKeys.EVAL: return tf.estimator.EstimatorSpec(mode=mode,predictions=predictions,loss=loss,eval_metric_ops=eval_metric_ops) #------bulid optimizer------ if FLAGS.optimizer == 'Adam': Optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8) elif FLAGS.optimizer == 'Adagrad': optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate, initial_accumulator_value=1e-8) elif FLAGS.optimizer == 'Momentum': Optimizer = tf. Train. MomentumOptimizer (learning_rate = learning_rate, momentum = 0.95) elif FLAGS. The optimizer = = 'FTRL' : optimizer = tf.train.FtrlOptimizer(learning_rate) train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step()) # Provide an estimator spec for `ModeKeys.TRAIN` modes if mode == tf.estimator.ModeKeys.TRAIN: return tf.estimator.EstimatorSpec(mode=mode,predictions=predictions,loss=loss,train_op=train_op)Copy the code
Packaged as estimator, the invocation is simple
Py --task_type=train --learning_rate=0.0005 --optimizer=Adam --num_epochs=1 --batch_size=256 --field_size=39 --feature_size=117581 --deep_layers=400,400,400 --dropout=0.5,0.5 --log_steps=1000 --num_threads=8 --model_dir=./model_ckpt/criteo/DeepFM/ --data_dir=.. /.. /data/criteo/ #predict python deepffm. Py --task_type=infer --learning_rate=0.0005 --optimizer=Adam --num_epochs=1 --batch_size=256 --field_size=39 --feature_size=117581 --deep_layers=400,400,400 --dropout=0.5,0.5,0.5 --log_steps=1000 --num_threads=8 --model_dir=./model_ckpt/criteo/DeepFM/ --data_dir=.. /.. /data/criteo/Copy the code
Complete code: lambdaji/ tF_repos
Service framework – Request in, PCTR out
TensorFlow Serving is a high-performance open source library for machine learning models Serving. It can deploy trained machine learning models online, using gRPC as an interface to receive external calls. Even more impressive is the support for hot model updates and automatic model versioning. This means that once TensorFlow Serving is deployed, you don’t need to worry about the online service any more, just your offline model training.
The model file recognized by TF-SERVING is first exported
Python deepfm. py --task_type=export --learning_rate=0.0005 --optimizer=Adam --batch_size=256 --field_size=39 --feature_size=117581 --deep_layers=400,400,400 --dropout=0.5,0.5,0.5 --log_steps=1000 --num_threads=8 --model_dir=./model_ckpt/criteo/DeepFM/ --servable_model_dir=./servable_model/Copy the code
By default, the timestamp is used to manage the version. The generated file is as follows:
$ ls -lh servable_model/1517971230
|--saved_model.pb
|--variables
|--variables.data-00000-of-00001
|--variables.index
Copy the code
And then write a client to send the request, in C++
PredictRequest predictRequest; PredictResponse response; ClientContext context; predictRequest.mutable_model_spec()->set_name(model_name); predictRequest.mutable_model_spec()->set_signature_name(model_signature_name); //serving_default google::protobuf::Map<tensorflow::string, tensorflow::TensorProto>& inputs = *predictRequest.mutable_inputs(); //feature to tfrequest std::vector<long> ids_vec = {1,2,3,4,5,6,7,8,9,10,11,12,13,15,555,1078,17797,26190,26341,28570,35361,35613, 35984484 24513 64640 53659 64662 06716 28840 88841 19868 89882 80882 83100 288100 300102 447109 932111 823}; STD: : vector < float > vals_vec = {0.05, 0.006633, 0.05, 0,0.021594, 0.008, 0.15, 0.04, 0.362, 0.1, 0.2, 0,0.04, 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1}; tensorflow::TensorProto feat_ids; for (uint32_t i = 0; i < ids_vec.size(); i++) { feat_ids.add_int64_val(ids_vec[i]); } feat_ids.mutable_tensor_shape()->add_dim()->set_size(1); //batch_size feat_ids.mutable_tensor_shape()->add_dim()->set_size(feat_ids.int64_val_size()); feat_ids.set_dtype(tensorflow::DataType::DT_INT64); inputs["feat_ids"] = feat_ids; tensorflow::TensorProto feat_vals; for (uint32_t i = 0; i < vals_vec.size(); i++) { feat_vals.add_float_val(vals_vec[i]); } feat_vals.mutable_tensor_shape()->add_dim()->set_size(1); //batch_size feat_vals.mutable_tensor_shape()->add_dim()->set_size(feat_vals.float_val_size()); //sample size feat_vals.set_dtype(tensorflow::DataType::DT_FLOAT); inputs["feat_vals"] = feat_vals; Status status = _stub->Predict(&context, predictRequest, &response);Copy the code
Complete code: lambdaji/ tF_repos
Production environments have high time and performance requirements, and DNN is much more computative than LR’s simple table lookup operations, often requiring a trade-off between performance and performance. This link tests engineering ability. The following figure shows the real data of wide_N_deep Model in the online environment, as can be seen:
Intercept part 15ms: corresponding parse request package, query Redis /tair, transform feature format and log and other slope part 0.5ms: a sample forward timeCopy the code
An interesting observation is that the average time consumption decreases rather than rises as the amount is increased further, and tF-SERVING is suspected of cache class optimization.
Model Performance
Originally planned to adjust the parameters and then put out, but since the machine ran hang three times gave up 🙁
There may be several reasons why the picture doesn’t work well:
Feature engineering is not done well (continuous features are not suitable for embedding, negative sampling, shuffle, etc.) there are problems with model design (not sure if there are bugs). Parameter tuning and model does not converge to a good enough solutionCopy the code
If you’re interested, you can fork it out and do it faster than if you’re alone behind closed doors.
Project address: github.com/lambdaji/tf…
Finally, I wish you all a happy New Year!
References:
Github.com/wnzhang/dee…
Github.com/Atomu2014/p…
Github.com/hexiangnan/…
Github.com/hexiangnan/…
Github.com/ChenglongCh…
zhuanlan.zhihu.com/p/32563337
zhuanlan.zhihu.com/p/28202287