Enterprise machine learning Pipline – Sample processing


As we all know, the commonly used machine learning platform includes two modules: offline training and online prediction. Of these, the offline part is generally responsibleLog data processing.Sample processing.Characteristics of the processingandModel trainingAnd so on. The online part includes the online partReal-time predictOnline Inference (also called Inference of online models). The flow chart is as follows:

It is an accepted truth in machine learning that data and features determine the upper limit of machine learning, and models and algorithms only approximate this upper limit. I think it makes sense ^-^ that data is critical to machine learning models.


In the process of sample processing, it decides which users’ behavior logs are used to train the model (hereinafter referred to as sample selection) and the corresponding operation of sample sampling. Which user’s behavior log training model is used directly determines the proportion of positive and negative samples of the machine learning model, and the proportion of positive and negative samples is directly related to the scoring of the real CTR or real CVR of the predicted model, which is generally called prior CXR. True_cxr = pos/(pos + neg); However, the logs generated by some online models can reach hundreds of millions or even hundreds of millions of logs per day if the model is updated incresively in a larger system. Therefore, we will do some sampling operations, usually negative sampling of negative samples. However, as mentioned above, negative sampling is done, which affects the real CXR value. Therefore, we conduct CXR calibration operation based on the corresponding sampling ratio, which will be discussed in detail later.

We will introduce them from the following two aspects:

1. Sample selection

2. Sample Sampling and CXR calibration


1. Sample selection

As mentioned above, sample selection determines which behavior logs of the user are selected by the model for training. Of course, we hope that the selected samples can learn the real distribution of data encountered by the model online as much as possible, so that the data distribution of offline training should be as consistent as possible with the distribution of online estimated data.

At the same time, for the determination of positive and negative sample funnel, we generally use the data that has been exposed without clicking or downloading or other transformation behaviors as negative samples, and use the data that has been exposed and clicking (downloading, activating) and other behaviors as positive samples. When engineers are doing this, they can use the exposure behavior to go to left Join and click on the behavior log to determine the unique key. The sample that can join is a positive sample, otherwise it is a positive sample. The specific code is described below.

However, the actual data encountered is very dirty, and there are even some brush data, which requires us to do some targeted processing in the context of in-depth understanding of the business scenarios generated by the data.

The simplest is that many apps now have a home screen after opening. There is no doubt that the first screen of the home page has a huge amount of display exposure traffic, but at the same time, it also produces a lot of invalid exposure. Such as:

(1) Invalid exposure. After entering the APP, users directly go to other pages and stay on the first screen of the home page for a very short time, which is an unconscious operation of users. In the actual project, engineers could remove exposures that had users on the front screen of the home page, but there were no samples of clicks or other conversions. This is just one way to do it, and I know of a large Internet company whose AD system has increased online ECPMs by more than 5% a day for this simple operation.

(2) miscontact. A user accidentally clicks on a page or video, enters it, and then exits it in a fraction of a second. Such traffic is not an effective user behavior of users. It is a very large and meaningless behavior, and algorithm engineers need to remove it according to actual business scenarios to some extent. In practice, engineers can obtain the time stamp of the user from entering a page to exiting the page to make a subtraction and remove the part of sample whose entry time is too short.

(3) Uniqueness of recall model samples. Take what kind of sample training recall model, such a basic problem, many people still have mistakes, habitual copying sorting method, counterproductive. If sorting is the art of features, then recall is the art of samples, especially the art of negative samples, as the saying goes: negative samples are king. Sorting is very particular about the so-called “true negative” sample, that is, must take the “exposure not clicked” sample as a negative sample. For this reason, there is also the practice of above click, that is, only take the clicked article above the unclicked article as a negative sample. The general method of sample recall is to take click samples as positive samples and random samples as negative samples. In order to limit hot users and hot operations, there are even some practices that only intercept a certain length of samples for each user to train the model, so that the recall model can have both personality and commonness. There are a lot of things that can be done by the sample of recall model. Here we will just taste it and see if we can analyze it in detail after in-depth optimization process.

The importance of samples for a model is not repeated here, below will be a simple version of the code, for some small and medium-sized Internet companies, is directly copied in the past use. Engineers have companies, but the technology of engineers is shared by everyone

2, sample Sampling and CXR calibration

There are many techniques of sample sampling. I remember that there is a chapter in the 100-sided machine learning book published by Gourd Children. Chapter 8 seems to be devoted to various sampling techniques, such as rejection sampling, importance sampling, Gibbs sampling, Monte Carlo sampling, etc. If you want to optimize the negative sample, check it out. Ashamed, to treat a few large Internet company’s commercial algorithm team, found that we love to use or random negative sampling ~~~~, this is the rocket to build, but the screw is also particularly good.

The conventional method of random negative sampling is to use the random function to generate a random number. If the current sample is judged to be a negative sample, the random book and sampling rate are judged, and then whether to select this sample. This is a random negative sample.

As for calibration, after you use the sampled samples for model training, the distribution of data trained by the model is biased, and the CXR of model scoring is larger at this time (because you have removed a large number of negative samples, resulting in a high proportion of positive samples), so CXR calibration should be carried out at this time.

The calibration method commonly used in the industry is a calibration formula:This formula is also a logic to restore the weighting of negative samples to the model in proportion.

The formula comes from a 2014 facebook paper “Practical Lessons from Predicting Clicks on Ads at Facebook.”


3. Code time

talk is cheap, show the code !!!!

The theoretical part of sample selection, random negative sampling and CXR calibration has been introduced. The following is a practical code that can be used in some companies’ business. This is a personal interest in writing code, ability is limited, a family of words, do not like spray ~~~

Scala Spark is used for code sample selection and random negative sampling, and a Java function is attached to CXR calibration as follows:

Welcome to follow wechat public number: algorithm full stack roadpackage Data
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf.SparkContext}
import scala.util.control.Breaks

object GenerateLogSamplesWithSelectAndSample {

  case class LogItem(imei: String, timestamp: String, posid: String, triggerid: String, adid: String, event: String, label: String)

  def main(args: Array[String) :Unit = {

    val Array(logPath, outputPath, positionId, sampleRate) = args

    val sparkConf = new SparkConf().setAppName("GenerateLogSamplesWithSelectAndSample")
    val sc = new SparkContext(sparkConf)

    val positionIdStr = positionId.split(",").toSet
    val positionIdIdSet = sc.broadcast(positionIdStr)


    // Aggregate to get exposure data
    valexpose_data = sc.textFile(logPath) .filter(x => x ! =null)
      .map(e => {
        val eles = e.split("\t")
        val timestamp = eles(0)
        val imei = eles(1)
        val posid = eles(2)
        val triggerid = eles(3)
        val adid = eles(4)
        val event = eles(5)
        var label = null;

        (imei, LogItem(imei, timestamp, posid, triggerid, adid, event, label))

      })
      // Filter out exposure behavior logs for fixed ads
      .filter(e => positionIdIdSet.value.contains(e._2.posid) &&  e._2.event == "expose")
      // Aggregate user behavior based on user IMEI
      .groupByKey()
      .mapValues(e => {
      // Repeat according to triggerID, posid, aDID
      val distinct_list = e.map(e => (e.triggerid + "$" + e.posid + "$" + e.adid, e))
        .groupBy(_._1)
        .map(e => e._2)
        .map(e => e.toArray
          .sortBy(_._2.timestamp)
          .last
          ._2
        )
      distinct_list
    })
      .persist(StorageLevel.MEMORY_AND_DISK_SER)


    // Click behavior data
    val click_data = sc.textFile(logPath)
      .filter(x => x.nonEmpty)
      .map(e => {
        val eles = e.split("\t")
        val timestamp = eles(0)
        val imei = eles(1)
        val posid = eles(2)
        val triggerid = eles(3)
        val adid = eles(4)
        val event = eles(5)
        val label = null
        (imei, LogItem(imei, timestamp, posid, triggerid, adid, event, label))
      })
      .filter(e => positionIdIdSet.value.contains(e._2.posid) &&  e._2.event == "expose") // Filter out click behavior
      .groupByKey()
      .persist(StorageLevel.MEMORY_AND_DISK_SER)


    // Left join click behavior, can join on the positive sample, cannot join on the negative sample
    // Label the sample
    val labeled_data = expose_data.leftOuterJoin(click_data)
      .mapValues(e => {

        val exposeArray = e._1.toArray

        if(e._2 ! =null && e._2.nonEmpty) {
          val clickArray = e._2.get.toArray

          if (exposeArray.size > 0 && clickArray.size > 0) {

            for (i <- 0 to exposeArray.size - 1) {
              val expose = exposeArray(i).asInstanceOf[LogItem]
              val breaker = new Breaks
              breaker.breakable {
                for (j <- 0 to clickArray.size - 1) {

                  val exposeTriggerId = expose.triggerid
                  val exposePosId = expose.posid

                  val click: LogItem = clickArray(j).asInstanceOf[LogItem]
                  val clickTriggerId = click.triggerid
                  val clickPosId = click.posid

                  // Join exposure and click according to triggerId, posId, aDID. The label on the join is positive sample 1, otherwise it is negative sample
                  if(exposeTriggerId.equals(clickTriggerId) && ! exposeTriggerId.equals("null") && !clickTriggerId.equals("null"))
                    if(exposePosId.equals(clickPosId) && ! exposePosId.equals("null") && !clickPosId.equals("null"))
                      if(expose.adid ! =null&& click.adid ! =null && expose.adid == click.adid) {
                        LogItem(expose.imei, expose.timestamp, expose.posid, expose.triggerid, expose.adid, null."1")
                        breaker.break}}// end for}}// end for}}// Join not on the exposure
        for (i <- 0 to exposeArray.size - 1) {
          val expose = exposeArray(i).asInstanceOf[LogItem]
          if (expose.label == null)
            LogItem(expose.imei, expose.timestamp, expose.posid, expose.triggerid, expose.adid, null."0")
        }
        exposeArray
      })
      .flatMapValues(e => e)
      .persist(StorageLevel.MEMORY_AND_DISK_SER)


    // All the above processes are the process of labeling the sample, expose the sample left join click the sample operation
    // Sample cleaning process, remove the triggerID of the sample with exposure, but no click action on the first screen
    val noSenseTriggerIds = labeled_data.map(e => (e._2.posid + "$" + e._2.triggerid, (1, e._2.label)))
      .reduceByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2)) // Exposure times, click times
      Triggerid triggerID triggerID triggerID triggerID triggerID triggerID triggerID triggerID
      // Let the first screen have 10 ads, pay attention to the customization
      .filter(x => x._2._1 <= 10 && x._2._2 == 0) // And there is no click, here can be changed to other conversion behavior
      .map(e => e._1.split(\ \ "$") (1))
      .collect()
      .toSet
    val noSenseTriggerIdsBC = sc.broadcast(noSenseTriggerIds)


    val final_sample_data = labeled_data
      // Sample cleaning process.filter(x => ! noSenseTriggerIdsBC.value.contains(x._2.triggerid))// Negative sampling process
      .map(e => {
      if (e._2.label.equals("1") || ((e._2.label.equals("0") && math.random <= sampleRate.toDouble))) {
        // All positive samples pass, and some negative samples pass
        e
      } else null}) .filter(e => e ! =null)
      .mapValues(e => {
        val imei = e.imei
        val triggerid = e.triggerid
        val timestamp = e.timestamp
        val adid = e.adid
        val posid = e.posid
        val label = e.label
        label +"t"+imei +"t"+triggerid +"t"+adid +"t"+posid +"t"+timestamp
      })

    val outputPartNum = math.ceil(final_sample_data.count() / 400000).toInt
    final_sample_data.repartition(outputPartNum).saveAsTextFile(outputPath,classOf[GzipCodec])}}Copy the code

The above shows a set of sample code that can be used in a real project, including filtering user behavior logs for a given AD location, labeling user behavior logs, simple business-specific negative sample filtering, and negative sampling. Here is the calibration code for CxR:

Welcome to follow wechat public number: algorithm full stack roadpublic static double ctrResetAfterSample(double probability, double samplingRate) {
        Double score = 0.0 D;
        if (samplingRate > 0.0 D && samplingRate < 1.0 D) {
            score = probability / (probability + (1.0 D - probability) / samplingRate);
        } else {
            score = probability;
        }
        return score;
    }

Copy the code

Here, the whole sample processing part is introduced, you can use the above method to adapt their own business code, feel useful to like and share it ~

Welcome to scan code to pay attention to the author’s public number: algorithm full stack road