Use Spark MLIB to implement movie recommendation

Source code and data set: github.com/luo94852184…

Spark Machine Learning library MLLib MLLib is Spark’s Machine Learning library. It is designed to simplify the engineering practice of Machine Learning and expand it to a larger scale. MLlib consists of a number of generic learning algorithms and tools, including classification, regression, clustering, collaborative filtering, dimensionality reduction, etc., as well as low-level optimization primitives and high-level pipeline apis. Specifically, it mainly includes the following aspects:

  • 1. Algorithm tools: common learning algorithms, such as classification, regression, clustering and collaborative filtering;
  • 2. Feature bus: feature extraction, transformation, dimensionality reduction, and bus selection;
  • 3. Pipelines: Tools for building, evaluating, and tuning machine learning pipelines;
  • Persistence: Save and load algorithms, models and pipes;
  • 5. Practical tools: linear algebra, statistics, data processing and other tools.

Spark machine learning library has been divided into two packages since version 1.2

• Spark.mllib contains the ORIGINAL RDD-based algorithm API. Spark MLlib has a long history and was included earlier than 1.0. The algorithms provided are based on the original RDD. • Spark.ml provides dataframs-based apis that can be used to build machine learning workflows. ML Pipeline makes up for the shortcomings of the original MLlib library by providing users with a dataframe-based workstream API suite for machine learning.

Using the ML Pipeline API, it is easy to combine data processing, feature transformation, regularization, and multiple machine learning algorithms to build a single complete machine learning Pipeline. This approach gives us more flexibility, is more in tune with machine learning processes, and is easier to migrate from other languages. Spark officially recommends using Spark. Ml. If the new algorithm can be adapted to the concept of machine learning pipelines, it should be put into spark. Ml packages such as feature extractors and converters. Developers should note that starting with Spark2.0, RDD-based apis go into maintenance mode (that is, without adding any new features) and are expected to be removed from MLLib in 3.0.

Spark is growing so fast in machine learning that it now supports mainstream statistical and machine learning algorithms. MLlib is arguably the most computationally efficient of all open source machine learning libraries based on distributed architectures. MLlib currently supports four common machine learning problems: classification, regression, clustering, and collaborative filtering. The following table lists the main machine learning algorithms currently supported by MLlib:

The classic movie recommendation system displays user information through different dimensions. Co-occurrence similarity can be used to find similar items or users in filtering recommendations for coordination. The following is a simple definition of similarity

The same similarity formula of item I and item J is defined as follows:

The denominator is the number of users who like item I, and the numerator is the number of users who like both item I and item J. Thus, the above formula can be interpreted as what percentage of users who like item I also like item J (similar to association rules)

But there is a problem with the formula above. If item J is popular and many people like it, it will result in a Wij that is large, close to 1. Therefore, there will be a great similarity between any item and popular item. Therefore, we use the following formula for correction:

This format penalizes the weight of item J, thus reducing the likelihood that popular items will be similar to many items. (also normalized [I,j] and [j, I]

The data set

Movie: Movie ID, movie name, genre

Rating: user ID, movie ID, rating, timestamp

Specific code parts (Scala)

package com.luo


import java.util.Random

import org.apache.log4j.Logger
import org.apache.log4j.Level

import scala.io.Source

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext. _import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS.Rating.MatrixFactorizationModel}



object Recomment {


  def main(args: Array[String) :Unit = {


    // Create the Spark environment
    val conf = new SparkConf().setAppName("movieRecomment")

    val sc = new SparkContext(conf)

    // Read the file and preprocess it
    val ratings = sc.textFile("ratings.dat").map {
      line =>
        val fields = line.split("... "")
        (fields(3).toLong % 10.Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
      // Timestamp, user id, movie id, rating
      // The table already has a preset name
    }



      val movies = sc.textFile("movies.dat").map { line =>
        val fields = line.split("... "")
        // format: (movieId, movieName)
        (fields(0).toInt, fields(1))
      }.collect.toMap

    // Number of records, number of users, number of movies
    val numRatings = ratings.count
    val numUsers = ratings.map(_._2.user).distinct.count
    val numMovies = ratings.map(_._2.product).distinct.count
    println("From" + numRatings + "On the record" + "Analyzed." + numUsers + "People watched." + numMovies + "A movie")



    // Extract a subset of movies that received the most ratings for scoring heuristic
    // The densest part of the matrix

    val mostRatedMovieIds = ratings.map(_._2.product)
      .countByValue()
      .toSeq
      .sortBy(-_._2)
      .take(50) / / 50
      .map(_._1) // Get their id


    val random = new Random(0)
    val selectedMovies = mostRatedMovieIds.filter(
      x => random.nextDouble() < 0.2).map(x => (x, movies(x))).toSeq

    // Guide or inspire comments
    // Call the function to get ten random movies from the current most popular movies
    // Let the user score
    val myRatings = elicitateRatings(selectedMovies)
    val myRatingsRDD = sc.parallelize(myRatings)


    // Divide the scoring system into training set 60%, validation set 20%, and test set 20%
    val numPartitions = 20
    / / training set
    val training = ratings.filter(x => x._1 < 6).values
      .union(myRatingsRDD).repartition(numPartitions)
      .persist
    / / validation set
    val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values
      .repartition(numPartitions).persist
    / / test set
    val test = ratings.filter(x => x._1 >= 8).values.persist
    val numTraining = training.count
    val numValidation = validation.count
    val numTest = test.count
    println("Number of training sets :" + numTraining + ", number of validation sets:" + numValidation + ", number of test sets :" + numTest)


    // Train the model and evaluate the model on the validation set
    val ranks = List(8.12)
    val lambdas = List(0.1.10.0)
    val numIters = List(10.20)
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestValidationRmse = Double.MaxValue
    var bestRank = 0
    var bestLambda = 1.0
    var bestNumIter = - 1
    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
      val model = ALS.train(training, rank, numIter, lambda)
      val validationRmse = computeRmse(model, validation, numValidation)
      println("RMSE (validation)=" + validationRmse + "for the model trained with rand =" + rank + ", lambda=" + lambda + ", and numIter= " + numIter + ".")
      if (validationRmse < bestValidationRmse) {
        bestModel = Some(model)
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lambda
        bestNumIter = numIter
      }
    }

    // Get the best model in the test set
    val testRmse = computeRmse(bestModel.get, test, numTest)
    println("The best model was trained with rank=" + bestRank + " and lambda =" + bestLambda + ", and numIter =" + bestNumIter + ", and itsRMSE on the test set is" + testRmse + ".")


    // Generate personalized recommendations
    val myRateMoviesIds = myRatings.map(_.product).toSet
    valcandidates = sc.parallelize(movies.keys.filter(! myRateMoviesIds.contains(_)).toSeq)val recommendations = bestModel.get.predict(candidates.map((0, _)))
      .collect()
      .sortBy((-_.rating))
      .take(50)
    var i = 1
    println("The following movies are recommended for you.")
    recommendations.foreach { r =>
      println("%2d".format(i) + ":" + movies(r.product))
      i += 1}}/** Compute RMSE (Root Mean Squared Error). */
  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = {
    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
    val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
      .join(data.map(x => ((x.user, x.product), x.rating)))
      .values
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
  }


  /** Elicitate ratings from command-line. */
  def elicitateRatings(movies: Seq[(Int.String)]) = {
    val prompt = "Rate the following movies on a scale of 1-5."
    println(prompt)
    val ratings = movies.flatMap { x =>
      var rating: Option[Rating] = None
      var valid = false
      while(! valid) { print(x._2 +":")
        try {
          val r = Console.readInt
          if (r < 0 || r > 5) {
            println(prompt)
          } else {
            valid = true
            if (r > 0) {
              rating = Some(Rating(0, x._1, r))
            }
          }
        } catch {
          case e: Exception => println(prompt)
        }
      }
      rating match {
        case Some(r) => Iterator(r)
        case None= >Iterator.empty
      }
    }
    if (ratings.isEmpty) {
      error("No rating provided!")}else {
      ratings
    }

  }
}
Copy the code

Package the project in JAR format and upload it to the Spark cluster. After the upload, Spark reports the following message when submitting a task: Invalid signature file digest for the Manifest the main attributes blog.csdn.net/dai45195470…

zip -d <jar file name>.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
Copy the code

To start the spark

bin/spark-submit --master local --class com.luo.Recomment MoiveRecomment.jar
Copy the code

First, 10 films with the highest score density are randomly selected and scored:

Using matrix similarity, the training point model is generated from high to 50.