Use Spark MLIB to implement movie recommendation
Source code and data set: github.com/luo94852184…
Spark Machine Learning library MLLib MLLib is Spark’s Machine Learning library. It is designed to simplify the engineering practice of Machine Learning and expand it to a larger scale. MLlib consists of a number of generic learning algorithms and tools, including classification, regression, clustering, collaborative filtering, dimensionality reduction, etc., as well as low-level optimization primitives and high-level pipeline apis. Specifically, it mainly includes the following aspects:
- 1. Algorithm tools: common learning algorithms, such as classification, regression, clustering and collaborative filtering;
- 2. Feature bus: feature extraction, transformation, dimensionality reduction, and bus selection;
- 3. Pipelines: Tools for building, evaluating, and tuning machine learning pipelines;
- Persistence: Save and load algorithms, models and pipes;
- 5. Practical tools: linear algebra, statistics, data processing and other tools.
Spark machine learning library has been divided into two packages since version 1.2
• Spark.mllib contains the ORIGINAL RDD-based algorithm API. Spark MLlib has a long history and was included earlier than 1.0. The algorithms provided are based on the original RDD. • Spark.ml provides dataframs-based apis that can be used to build machine learning workflows. ML Pipeline makes up for the shortcomings of the original MLlib library by providing users with a dataframe-based workstream API suite for machine learning.
Using the ML Pipeline API, it is easy to combine data processing, feature transformation, regularization, and multiple machine learning algorithms to build a single complete machine learning Pipeline. This approach gives us more flexibility, is more in tune with machine learning processes, and is easier to migrate from other languages. Spark officially recommends using Spark. Ml. If the new algorithm can be adapted to the concept of machine learning pipelines, it should be put into spark. Ml packages such as feature extractors and converters. Developers should note that starting with Spark2.0, RDD-based apis go into maintenance mode (that is, without adding any new features) and are expected to be removed from MLLib in 3.0.
Spark is growing so fast in machine learning that it now supports mainstream statistical and machine learning algorithms. MLlib is arguably the most computationally efficient of all open source machine learning libraries based on distributed architectures. MLlib currently supports four common machine learning problems: classification, regression, clustering, and collaborative filtering. The following table lists the main machine learning algorithms currently supported by MLlib:
The classic movie recommendation system displays user information through different dimensions. Co-occurrence similarity can be used to find similar items or users in filtering recommendations for coordination. The following is a simple definition of similarity
The same similarity formula of item I and item J is defined as follows:
The denominator is the number of users who like item I, and the numerator is the number of users who like both item I and item J. Thus, the above formula can be interpreted as what percentage of users who like item I also like item J (similar to association rules)
But there is a problem with the formula above. If item J is popular and many people like it, it will result in a Wij that is large, close to 1. Therefore, there will be a great similarity between any item and popular item. Therefore, we use the following formula for correction:
This format penalizes the weight of item J, thus reducing the likelihood that popular items will be similar to many items. (also normalized [I,j] and [j, I]
The data set
Movie: Movie ID, movie name, genre
Rating: user ID, movie ID, rating, timestamp
Specific code parts (Scala)
package com.luo
import java.util.Random
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext. _import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS.Rating.MatrixFactorizationModel}
object Recomment {
def main(args: Array[String) :Unit = {
// Create the Spark environment
val conf = new SparkConf().setAppName("movieRecomment")
val sc = new SparkContext(conf)
// Read the file and preprocess it
val ratings = sc.textFile("ratings.dat").map {
line =>
val fields = line.split("... "")
(fields(3).toLong % 10.Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
// Timestamp, user id, movie id, rating
// The table already has a preset name
}
val movies = sc.textFile("movies.dat").map { line =>
val fields = line.split("... "")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect.toMap
// Number of records, number of users, number of movies
val numRatings = ratings.count
val numUsers = ratings.map(_._2.user).distinct.count
val numMovies = ratings.map(_._2.product).distinct.count
println("From" + numRatings + "On the record" + "Analyzed." + numUsers + "People watched." + numMovies + "A movie")
// Extract a subset of movies that received the most ratings for scoring heuristic
// The densest part of the matrix
val mostRatedMovieIds = ratings.map(_._2.product)
.countByValue()
.toSeq
.sortBy(-_._2)
.take(50) / / 50
.map(_._1) // Get their id
val random = new Random(0)
val selectedMovies = mostRatedMovieIds.filter(
x => random.nextDouble() < 0.2).map(x => (x, movies(x))).toSeq
// Guide or inspire comments
// Call the function to get ten random movies from the current most popular movies
// Let the user score
val myRatings = elicitateRatings(selectedMovies)
val myRatingsRDD = sc.parallelize(myRatings)
// Divide the scoring system into training set 60%, validation set 20%, and test set 20%
val numPartitions = 20
/ / training set
val training = ratings.filter(x => x._1 < 6).values
.union(myRatingsRDD).repartition(numPartitions)
.persist
/ / validation set
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values
.repartition(numPartitions).persist
/ / test set
val test = ratings.filter(x => x._1 >= 8).values.persist
val numTraining = training.count
val numValidation = validation.count
val numTest = test.count
println("Number of training sets :" + numTraining + ", number of validation sets:" + numValidation + ", number of test sets :" + numTest)
// Train the model and evaluate the model on the validation set
val ranks = List(8.12)
val lambdas = List(0.1.10.0)
val numIters = List(10.20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = 1.0
var bestNumIter = - 1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation)=" + validationRmse + "for the model trained with rand =" + rank + ", lambda=" + lambda + ", and numIter= " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
// Get the best model in the test set
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank=" + bestRank + " and lambda =" + bestLambda + ", and numIter =" + bestNumIter + ", and itsRMSE on the test set is" + testRmse + ".")
// Generate personalized recommendations
val myRateMoviesIds = myRatings.map(_.product).toSet
valcandidates = sc.parallelize(movies.keys.filter(! myRateMoviesIds.contains(_)).toSeq)val recommendations = bestModel.get.predict(candidates.map((0, _)))
.collect()
.sortBy((-_.rating))
.take(50)
var i = 1
println("The following movies are recommended for you.")
recommendations.foreach { r =>
println("%2d".format(i) + ":" + movies(r.product))
i += 1}}/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
.join(data.map(x => ((x.user, x.product), x.rating)))
.values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
}
/** Elicitate ratings from command-line. */
def elicitateRatings(movies: Seq[(Int.String)]) = {
val prompt = "Rate the following movies on a scale of 1-5."
println(prompt)
val ratings = movies.flatMap { x =>
var rating: Option[Rating] = None
var valid = false
while(! valid) { print(x._2 +":")
try {
val r = Console.readInt
if (r < 0 || r > 5) {
println(prompt)
} else {
valid = true
if (r > 0) {
rating = Some(Rating(0, x._1, r))
}
}
} catch {
case e: Exception => println(prompt)
}
}
rating match {
case Some(r) => Iterator(r)
case None= >Iterator.empty
}
}
if (ratings.isEmpty) {
error("No rating provided!")}else {
ratings
}
}
}
Copy the code
Package the project in JAR format and upload it to the Spark cluster. After the upload, Spark reports the following message when submitting a task: Invalid signature file digest for the Manifest the main attributes blog.csdn.net/dai45195470…
zip -d <jar file name>.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
Copy the code
To start the spark
bin/spark-submit --master local --class com.luo.Recomment MoiveRecomment.jar
Copy the code
First, 10 films with the highest score density are randomly selected and scored:
Using matrix similarity, the training point model is generated from high to 50.