Text classification Spark

Spark2.0 began to introduce Dataframe as the upper encapsulation of RDD to shield complex operations at the RDD level. In this paper, ML machine learning library in Spark Milib was used for multi-classification prediction of news text, including data preprocessing, word segmentation, Complete machine learning demo for tag and feature vectorization conversion, multi-classification model training (including naive Bayes, logistic regression, decision tree and random forest), classification prediction and model evaluation. The word segmentation method in this paper is HanLP word segmentation tool kit (rich documents, open algorithm, open source code, and the word segmentation effect is relatively good after testing).

1. Data preprocessing

1.1 Text Data

The data used in this paper are 4 types of news, each of which contains label, title, time and news content. The “\u00EF” symbol is used as the separator, and the data format is as follows:

Home | cultural news I 11th national excellent dance shows the performance will be held in wuhan i2016-07-05 19:25:00 I Beijing, July 5 (xinhua) (reporter zhou wei) sponsored by the ministry of culture, people's government of hubei province... Home | | financial centre of finance and economics channel I new general education power is acquired 100% hangzhou blue ocean travel outbound i2016-07-04 21:49:00 I learn market in hangzhou, July 4 (Reuters) - hu abundance) on July 4, new general education... Military news homepage | I ring too exercise in Chinese diving team carry out diving accident emergency medical treatment practice i2016-07 July 4 (Reuters) - 04 19:40:00 I Hawaii (Mr. Li in) 3, local time, attend... Homepage | sports news I dejan stankovic cup opener will hold a ceremony for wang zhizhi i2016-07-04 10:39:00 I at eight o 'clock on Tuesday night, the Chinese men's basketball team will be in Beijing to Rio DE janeiro Olympic squad to play at home...Copy the code

1.2 Pretreatment process

Text cleaning -> label indexing -> Content text segmentation -> Remove stop words -> Take the first 5000 words as features -> Feature vectorization -> Save the preprocessing model -> Call the preprocessing model -> Output preprocessing data (indexedLabel, Features)

1.3 Label Indexing

First read the text into Dataframe format, index the tag column data, and vectorize {culture, economy, military and sports} to {0,1,2,3}.

/** * Data cleaning can be rewritten according to specific data structures and business scenarios. Note: The output must contain the label field "Label" * @param filePath Data path * @Param Spark SparkSession * @return Data after cleaning, including fields: "label", "title", "time", "content" */
  def clean(filePath: String, spark: SparkSession) :DataFrame = {
    import spark.implicits._
    val textDF = spark.sparkContext.textFile(filePath).flatMap { line =>
      val fields = line.split("\u00EF")   // Delimiter: I, divided into label, title, time, content
      / / | cultural news home page I 11th national excellent dance show show will be held in wuhan i2016-07-05 19:25:00 I Beijing, July 5 (xinhua) (reporter zhou wei) by the ministry of culture...
      / / homepage | | financial channel I in the first half of the financial center of zhejiang port crude imports over the same period set records i2016-07-04 21:54:00 hangzhou on July 4, I...
      if (fields.length > 3) {
        val categoryLine = fields(0)
        val categories = categoryLine.split("\ \ |")
        val category = categories.last
        // Divide into 4 label names and others, and finally remove the label into other data
        var label = "Other"
        if (category.contains("Culture")) label = "Culture"
        else if (category.contains("Finance and economics")) label = "Finance and economics"
        else if (category.contains("Military")) label = "Military"
        else if (category.contains("Sports")) label = "Sports"
        else {}
        // Output label, title, time, content
        val title = fields(1)
        val time = fields(2)
        val content = fields(3)
        if(! label.equals("Other")) Some(label, title, time, content) else None
      } else None
    }.toDF("label"."title"."time"."content")
    // Output label, title, time, content DF
    textDF
  }
  /** * process label conversion to index form * @param data input label field data * @return label index model, model increment field: "indexedLabel" */
  def indexrize(data: DataFrame) :StringIndexerModel = {
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)
    labelIndexer
  }
Copy the code
predictDF.select("label"."indexedLabel").show(10, truncate = false)
Copy the code

1.4 Content field segmentation

To deal with the content field, the word should be segmented first, then the stop words should be removed and the feature vector should be converted to facilitate the training and prediction of the classification model. In this paper, a Segmenter class is created by imitating the StopWordsRemover class in the ML package of Spark, which is used for data segmentation. HanLP word segmentation tool is called internally.

The closures provided by Spark such as StopWordsRemover are limited to ML packages and custom classes cannot be invoked. Therefore, the use of StopWordsRemover is similar to that of StopWordsRemover, but its internal structure is different. The Segmenter class did not inherit Transformer class, so it could not conduct pipeline operation. Therefore, the parameter tuning of segmentation model was not added in the process of hyperparameter tuning of classification model.

@param data Enter the data of the field to be segmented. "Content" * @param params parameter * @return DataFrame after the segmentation, add the field: "tokens", "removed" */
  def segment(data: DataFrame, params: PreprocessParam) :DataFrame = {
    val spark = data.sparkSession
    // Set the segmentation model
    val segmenter = new Segmenter()
      .setSegmentType(params.segmentType) // the way of participle
      .isDelEn(params.delEn)              // Whether to remove English words
      .isDelNum(params.delNum)            // Whether to remove numbers
      .addNature(params.addNature)        // Whether to add a part of speech
      .setMinTermLen(params.minTermLen)   // Minimum word length
      .setMinTermNum(params.minTermNum)   // Minimum number of words in line
      .setInputCol("content")             // Enter the content field
      .setOutputCol("tokens")             // Output the field after the word segmentation
    // do the participle
    val segDF = segmenter.transform(data)
Copy the code

1.5 Remove stop words

After word segmentation, it is necessary to remove some commonly used nonsense words such as “of”, “we” and “is” (collectively known as “stop words”). These words don’t have much meaning, but if these words are not removed, they will strongly interfere with our feature extraction. (For example, in the sports category, “of” appears 500 times, “football” appears 300 times, but clearly football is better for the sports classification, while “of” affects the results of the sports classification.

To remove stop words we call the StopWordsRemover class in the ML package:

    // Read stop word data
    val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect()
    // Set the stop word model
    val remover = new StopWordsRemover()
      .setStopWords(stopWordArray)
      .setInputCol(segmenter.getOutputCol)   // Read "tokens"
      .setOutputCol("removed")               // Output removed field after the stop word "removed"
    // delete the stop word
    val removedDF = remover.transform(segDF)
    removedDF
  }
Copy the code

1.6 Feature vectorization

Because the commonly used classification and clustering algorithms are based on the vector space model VSM (that is, the object vector is transformed into an N-dimensional vector and mapped to a point in the N-dimensional hyperspace), VSM transforms the data into vector form, facilitating matrix operations on large-scale data, etc. The similarity between two vectors can also be calculated by calculating the distance between two points in the hyperspace (typically the cosine distance). Therefore, we need to transform the processed corpus into vector form, a process called vectorization.

Here we also call spark’s vectorizer class CountVectorizer to vectorize:

 /** * Feature vectorization, including vocabulary filtering * @param data input vectorization of the field "removed" * @param params configuration parameter * @return vector model */
 def vectorize(data: DataFrame, params: PreprocessParam) :CountVectorizerModel = {
   // Set the vector model
   val vectorizer = new CountVectorizer()
     .setVocabSize(params.vocabSize)
     .setInputCol("removed")
     .setOutputCol("features")
   val parentVecModel = vectorizer.fit(data)
   // Filter out the number features that do not exist in the words
   val numPattern = "[0-9] +".r
   val vocabulary = parentVecModel.vocabulary.flatMap {
     term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term)
   }
   val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary)
     .setInputCol("removed")
     .setOutputCol("features")
   vecModel
 }
Copy the code

The field “content” was first segmented and removed to obtain “removed”. Then all words were taken as features and feature vectorization was carried out to obtain the “features” field:

1.7 Data processing model training, saving and calling

In order to facilitate the separate training and prediction of each model, preprocessing is also trained, saved and called as a data processing model. The method is as follows:

 /** * Training preprocessing model * @param filePath data path * @param Spark SparkSession * @return (preprocessing data, index model, vector model) * "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features" */
  def train(filePath: String, spark: SparkSession) : (DataFrame.StringIndexerModel.CountVectorizerModel) = {

    val params = new PreprocessParam             // Preprocess parameters
    val cleanDF = this.clean(filePath, spark)    // Read DF and clean data
    val indexModel = this.indexrize(cleanDF)     // Call the index model
    val indexDF = indexModel.transform(cleanDF)  // Label indexing
    val segDF = this.segment(indexDF, params)    // split the content field into words
    val vecModel = this.vectorize(segDF, params) // Call the vector model
    val trainDF = vecModel.transform(segDF)      // content segmentation feature vectorization
    this.saveModel(indexModel, vecModel, params) // Save the model

    (trainDF, indexModel, vecModel)
  }
  /** * fit preprocessing model * @param filePath data path * @param Spark SparkSession * @return (preprocessing data, index model, vector model) */
  def predict(filePath: String, spark: SparkSession) : (DataFrame.StringIndexerModel.CountVectorizerModel) = {
    val params = new PreprocessParam                    // Preprocess parameters
    val cleanDF = this.clean(filePath, spark)           // Read DF and clean data
    val (indexModel, vecModel) = this.loadModel(params) // Load index and vector model
    val indexDF = indexModel.transform(cleanDF)         // Label indexing
    val segDF = this.segment(indexDF, params)           // Content field segmentation
    val predictDF = vecModel.transform(segDF)           // content segmentation feature vectorization
    (predictDF, indexModel, vecModel)
  }
Copy the code

Call the preprocessing model, and take out 5 results after data processing:

+-----+--------------------+-------------------+--------------------+------------+--------------------+----------------- ---+--------------------+ |label| title| time| content|indexedLabel| tokens| removed| features| +-----+--------------------+-------------------+--------------------+------------+--------------------+----------------- + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | financial hopes north, southwest dairy giant. 10:46:00 | 2016-06-27 | Beijing times dispatch (reporter hu smile... | | 1.0 [, jinghua times dispatch, reporters, hu smile | red... [jinghua times dispatch, journalists, Hu smile red... | (5000, [3,4,14,22,... | | | culture national seal cutting famous masterpiece in big.. | 2016-06-02 21:53:00 | neijiang June 2 (Reuters) wang jue Chen... | | 3.0 [neijiang, June, (, wang jue,... | [neijiang, June, (Reuters), wang jue,... | (5000, [0,8,10,13... | | | culture world oceans day into eight days pour... | 2016-05-31 15:38:00 | Beijing on May 31, Reporter... | | 3.0 [Beijing, may, and xinhua,... | [Beijing, may, and xinhua, [0,3,10,13,...... | (5000, | | | military UK media evaluation of China draft to relax the body.. | 2016-06-02 08:30:00 | reference news network, according to UK media With.. | | 0.0 [| reference news, as the Chinese army,... [reference news, the Chinese army, relaxing, [0,5,12,14,...... | (5000, | | | of finance and economics of the g20 summit in 2016.. | 2016-06-25 18:52:00 | | | 1.0 xinhua xiamen on June 25 diary... [xinhua, xiamen, in June, (,... | [xinhua, xiamen, In June, (... | (5000, [| 3,8,10,12, +-----+--------------------+-------------------+--------------------+------------+--------------------+----------------- ---+--------------------+ only showing top 5 rowsCopy the code

2. Multi-classification model training and hyperparameter tuning

In this paper, four commonly used multi-classification models are used to train text data, and Pipeline + Gridsearch + CrossValidator are used to tune parameters. The parameters are directly placed in the training model, and the optimal model is saved.

2.1 Naive Bayes

Principle of Naive Bayes algorithm

Naive Bayes algorithm is a classification method based on Bayes theorem and independent assumption of feature conditions.

Conditional probability

P (A | B) is the premise of event B has occurred, the probability of event A occurs, called event B occurs under A conditional probability. Its basic solution formula is:

Characteristic condition independence hypothesis

Naive Bayes model

There are three commonly used models, polynomial, Bernoulli and Gauss models:

  • When features are discrete, use polynomial models.
  • Bernoulli model can also be applied to the discrete characteristics of the situation, the difference is that value of each characteristic in Bernoulli model can only be 1 s and 0 s, text classification, for example, a word in a document, is its characteristic value is 1, otherwise 0, and the number of occurrences of this article is to put the word as a feature, so don’t adapt to Yu Bo nuristan model
  • When features are continuous variables, it is difficult to describe classification features by adding smoothing coefficients into polynomial models, so Gaussian models are needed

Smoothing coefficient

The hyperparametric smoothing coefficient α is used to prevent the posterior probability from being 0. When α = 1, it is called Laplace smoothing; when 0 < α < 1, it is called Lidstone smoothing; when α = 0, it is not smoothed. In this paper, the smoothing coefficient is adjusted.

  /** * nbBestModel @param data @return nbBestModel */
  def train(data: DataFrame) :NaiveBayesModel = {
    val params = new ClassParam
    //NB classification model pipeline training parameters
    data.persist()
    data.show(5)
    / / NB model
    val nbModel = new NaiveBayes()
      .setModelType(params.nbModelType) // Polynomial model or Bernoulli model
      .setSmoothing(params.smoothing)   // Smooth coefficient
      .setLabelCol("indexedLabel")
      .setFeaturesCol("features")
    // Build the pipeline. The model has only one stages = 0
    val pipeline = new Pipeline()
      .setStages(Array(nbModel))
    // Set up grid search
    val paramGrid = new ParamGridBuilder(a)//.addGrid(nbModel.modelType, Array("multinomial", "bernoulli"))
      // The Bernoulli model requires data with characteristic 01
      .addGrid(nbModel.smoothing, Array(0.01.0.1.0.2.0.5))
      .build()
    To establish evaluator, you must ensure that the verified label column is the vectorized label
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("indexedLabel")
    // Create a cross-validated evaluator and set the parameters of the evaluator
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(2)
    // Run the cross-validation evaluator to get the model with the best parameter set
    val cvModel = cv.fit(data)
    // Get the optimal logistic regression model
    val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
    val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel]
    println("Number of classes (values the tag can use):" + bestNBModel.numClasses)
    println("Number of features accepted by the model:" + bestNBModel.numFeatures)
    println("The optimal modelType value is:"+ bestNBModel.explainParam(bestNBModel.modelType))
    println("The optimal smoothing value is:+ bestNBModel.explainParam(bestNBModel.smoothing))
    // Update the optimal naive Bayes model and train the data
    val nbBestModel = new NaiveBayes()
      .setModelType(bestNBModel.getModelType) // Polynomial model or Bernoulli model
      .setSmoothing(bestNBModel.getSmoothing) // Smooth coefficient
      .setLabelCol("indexedLabel")
      .setFeaturesCol("features")
      .fit(data)

    this.saveModel(nbBestModel, params)
    data.unpersist()
    nbBestModel
  }
Copy the code

There are many principles of the following three algorithms on the Internet, and the training codes are similar. This paper only gives part of the code of model tuning.

2.2 Logistic regression

    / / LR model
    val lrModel = new LogisticRegression()
      .setMaxIter(bestLRModel.getMaxIter)    // Maximum number of iterations of the model
      .setRegParam(bestLRModel.getRegParam)  // Regularize parameters
      .setElasticNetParam(params.elasticNetParam) //L1 /(L1 + L2)
      .setTol(params.converTol)          // Model convergence threshold
      .setLabelCol("indexedLabel")       // Set the indexed label field
      .setFeaturesCol("features")        // Sets the vectorized text feature field

    // Set up grid search
    val paramGrid = new ParamGridBuilder()
      .addGrid(lrModel.maxIter, Array(5.10))
      .addGrid(lrModel.regParam, Array(0.1.0.2))
      .build()
Copy the code

2.3 the decision tree

    // Decision tree model
    val dtModel = new DecisionTreeClassifier()
      .setMinInfoGain(params.minInfoGain)  // Minimum information gain threshold
      .setMaxDepth(params.maxDepth)        // Maximum depth of decision tree
      .setImpurity(params.impurity)        // Node purity and information gain method gini, entropy
      .setLabelCol("indexedLabel")         // Set the indexed label field
      .setFeaturesCol("features")          // Sets the vectorized text feature field
    // Set up grid search
    val paramGrid = new ParamGridBuilder()
      .addGrid(dtModel.minInfoGain, Array(0.0.0.1))
      .addGrid(dtModel.maxDepth, Array(10.20))
      .addGrid(dtModel.impurity, Array("gini"."entropy"))
      .build()
Copy the code

2.4 Random Forest

Random forest models often require debugging of two parameters to improve algorithm performance: numTrees and maxDepth

  • NumTrees: Increasing the number of decision trees will reduce the variance of the predicted results, so that the accuracy will be higher in the test. The training time increases linearly with numTrees
  • MaxDepth: Limits the maximum possible depth of the decision tree. The final decision tree may be less deep than maxDepth
  • MinInfoGain: Minimum information gain (set threshold), but less than this value will not continue forking due to other termination conditions or pruning
  • MaxBins: Maximum number of buckets to be selected for discretization of continuous characteristics and to determine how each node is split. (25,28,31)
  • 2. With regard to the impurity of information gain, entropy and gini impurity (” impurity “, “gini”)
  • MinInstancesPerNode: If the number of samples of a node is smaller than this value, the node will no longer be forked. (Set thresholds)
  • Auto: Whether to automatically select the number of features when each node is split
  • Seed: random number generation seed

It’s actually quite difficult to get an appropriate threshold. High thresholds may result in overly simplified trees, while low thresholds may not simplify enough.

The pre-pruning methods minInfoGain and minInstancesPerNode actually modify the stop conditions to get a reasonable result, which is not a good idea, in fact we often don’t even know what to look for. This requires post-pruning of the tree (post-pruning does not require user-specified parameters and is a more ideal method of pruning)

// Random forest model (without FIT)
    val rfModel = new RandomForestClassifier()
      .setMaxDepth(params.maxDepth)          // Maximum depth of decision tree
      .setNumTrees(params.numTrees)          // Set the number of decision trees
      .setMinInfoGain(params.minInfoGain)  // Minimum information gain threshold
      .setImpurity(params.impurity)        // Index of information gain, choose entropy or GINi impurity
      //.setmaxBins (params.maxbins) // The maximum number of bins to be used to determine how each node should be split when continuous features are discretized
      .setLabelCol("indexedLabel")           // Set the indexed label field
      .setFeaturesCol("features")            // Sets the vectorized text feature field
// Set up grid search
    val paramGrid = new ParamGridBuilder()
      .addGrid(rfModel.maxDepth, Array(5.10.20))
      .addGrid(rfModel.numTrees, Array(5.10.20))
      .addGrid(rfModel.minInfoGain, Array(0.0.0.1.0.5))
      .build()
Copy the code

3. Multi-classification model prediction and model evaluation

3.1 class MulticlassClassificationEvaluator model evaluation

Machine semesters generally require a quantitative indicator to measure their effectiveness: The accuracy in this model, the recall rate and F1 value (the three indicators are commonly used evaluation model predictive ability of a set of indicators), spark provides for more class MulticlassClassificationEvaluator classification model evaluation, and the three indicators of output at the same time

object Evaluations extends Serializable {
  /** * Multiple classification result evaluation * @param data classification result * @return (accuracy, recall rate, F1) */
  def multiClassEvaluate(data: RDD[(Double.Double)]): (Double.Double.Double) = {
    val metrics = new MulticlassMetrics(data)
    val weightedPrecision = metrics.weightedPrecision
    val weightedRecall = metrics.weightedRecall
    val f1 = metrics.weightedFMeasure

    (weightedPrecision, weightedRecall, f1)
  }
}
Copy the code

3.2 Prediction results and model evaluation of the four multi-classification models

Taking logistic regression as an example, the prediction results are shown in the figure below. The four values in “probability” represent the prediction probabilities of four categories:

/** * Created by wy in 2019/4/16 10:07 */
object MultiClassEvalution {

  def main(args: Array[String) :Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder
      .master("local")
      .appName("Multi_Class_Evaluation_Demo")
      .getOrCreate()

    val filePath = "data/dataTest/predict"

    // Preprocessing (cleaning, segmentation, vectorization)
    val preprocessor = new Preprocessor
    val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark)

    predictDF.select("content"."removed"."features").show(1, truncate = false)
    // Naive Bayes model prediction
    val nbClassifier = new NBClassifier
    val nbPredictions = nbClassifier.predict(predictDF, indexModel)

    // Logistic regression model prediction
    val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier
    val lrPredictions = lrClassifier.predict(predictDF, indexModel)

    // Decision tree model prediction
    val dtClassifier = new DTClassifier
    val dtPredictions = dtClassifier.predict(predictDF, indexModel)

    // Random forest model prediction
    val rfClassifier = new RFClassifier
    val rfPredictions = rfClassifier.predict(predictDF, indexModel)

    // Multiple model evaluations
    val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions)
    val classNames = Seq(Naive Bayes model."Logistic regression model"."Decision tree model"."Random forest Model")

    for (i <- 0 to 3) {
      val prediction = predictions(i)
      val className = classNames(i)

      val resultRDD = prediction.select("prediction"."indexedLabel").rdd.map {
        case Row(prediction: Double, label: Double) => (prediction, label)
      }

      val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD)
      println(s"\n========= $classNameEvaluation results ==========")
      println(S "Weighted accuracy:$precision")
      println(S "weighted recall rate:$recall")
      println(S "F1 value:$f1")}}}Copy the code