The introduction

Hello everyone, I am ChinaManor, which literally translates to Chinese code farmer. I hope I can become a pathfinder on the road of national rejuvenation, a ploughman in the field of big data, an ordinary person who is unwilling to be mediocre.

Spark Comprehensive Exercise – Analysis of Film Rating Data

This is my last post, which was just an implementation case (The demo class), unexpectedly let me bet in the question, but also let me steady pass (== this test paper is very difficult, test 60 points can rank in the class top 10==) but I in the round of the time, found their fatal weakness: write SQL ability is too vegetables.So I did it again and met my mentor’s three requirements:

Demand 1: search for the top ten movie titles with more than 50 scores and high average scores and their corresponding average scores demand 2: search for each movie category and its corresponding average scores Demand 3: search for the top ten movies with many scores

Data description: use files movies. CSV and ratings. CSV

Movie.csv this file is movie data and its corresponding dimension table data is movieId title genres movieId movie name the sample data of movie ownership is as follows: A comma to separate 1, Toy Story (1995), Adventure | Animation | Children | Comedy | Fantasy

Ratings. CSV this file is the rating data in the format of userId movieId rating timestamp movieId movie name timestamp of the movie category

Build table statements

CREATE DATABASE db_movies;
USE db_movies;
CREATE TABLE `ten_movies_avgrating` (

  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'on the id',

  `movieId` int(11) NOT NULL COMMENT 'the movie id',

  `ratingNum` int(11) NOT NULL COMMENT 'Number of points',

  `title` varchar(100) NOT NULL COMMENT 'Movie Title',

  `avgRating` decimal(10.2) NOT NULL COMMENT 'Average score',

  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'Update Time'.PRIMARY KEY (`id`),

  UNIQUE KEY `movie_id_UNIQUE` (`movieId`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE TABLE `genres_average_rating` (

  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'on the id',

  `genres` varchar(100) NOT NULL COMMENT 'Film Category',

  `avgRating` decimal(10.2) NOT NULL COMMENT 'Film Category Average Rating',

  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'Update Time'.PRIMARY KEY (`id`),

  UNIQUE KEY `genres_UNIQUE` (`genres`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

CREATE TABLE `ten_most_rated_films` (

  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'on the id',

  `movieId` int(11) NOT NULL COMMENT 'the movie Id',

  `title` varchar(100) NOT NULL COMMENT 'Movie Title',

  `ratingCnt` int(11) NOT NULL COMMENT 'Number of times a movie has been rated',

  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'Update Time'.PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;



Copy the code

Overview of the project structure

A. that B. that C. that D. that

Create the entity class first, and the fields are derived from the table builder sentence.

Entry.scala

package cn.movies.Packet

/ * * *@author ChinaManor
  *         #Description Entry
  *         #Date: 6/6/2021 17:23
  */
object Entry {
  case class Movies(movieId: String, // movieId title: String, // movie title genres: String // movie type)
  case class Ratings(userId: String, // userId: String movieId: String, // movieId: String, // movieId: String, // timestamp: String // timestamp)
  Mysql > select * from 'mysql'
  case class tenGreatestMoviesByAverageRating(movieId: String, // movieId ratingNum:String, title: String, // movie title avgratingnum :String // movie average rating)
  Mysql > select * from 'mysql'
  case class topGenresByAverageRatingGenres: String, // avgRating: String // Average grade)
  Mysql > select * from 'mysql'
  case class tenMostRatedFilms(movieId: String, // movieId title: String, // movie title ratingCnt: String // number of times the movie has been scored)
}
Copy the code

Create a table structure ~ ~!

Schema.scala

package cn.movies.Packet

import org.apache.spark.sql.types.{DataTypes, StructType}

/ * * *@author ChinaManor
  *         #Description Schema
  *         #Date: 6/6/2021 17:34
  */
object Schema {
  class SchemaLoader {

    // Movies data sets schema information

    private val movieSchema = new StructType()
      .add("movieId", DataTypes.StringType, false)
      .add("title", DataTypes.StringType, false)
      .add("genres", DataTypes.StringType, false)
    // ratings Data set schema information

    private val ratingSchema = new StructType()

      .add("userId", DataTypes.StringType, false)
      .add("movieId", DataTypes.StringType, false)
      .add("rating", DataTypes.StringType, false)
      .add("timestamp", DataTypes.StringType, false)

    def getMovieSchema: StructType = movieSchema
    def getRatingSchema: StructType = ratingSchema
  }
}

Copy the code

Then I started writing the Main method, which was only eighty lines of code…

Spark must have instance objects.

/ / create a spark session val spark = SparkSession. Builder. AppName (this. GetClass. GetSimpleName. StripSuffix ("$"))
      .master("local[4]")
      .getOrCreate
Copy the code

And then new the schema information

val schemaLoader = new SchemaLoader
Copy the code

And then try to read the CSV file,

// Read the Movie dataset val movieDF: DataFrame = readCsvIntoDataSet (spark, MOVIES_CSV_FILE_PATH, schemaLoader getMovieSchema) / / read Rating data set val ratingDF: DataFrame = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)Copy the code

Finding neither a method nor a path, I fix it

Private val MOVIES_CSV_FILE_PATH ="D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\movies.csv"
  private val RATINGS_CSV_FILE_PATH = "D:\\Users\\Administrator\\Desktop\\exam0601\\datas\\ratings.csv"/** * Read the data file and convert it to DataFrame ** @param Spark * @param path * @param schema * @return
    */

  def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
    val dataDF: DataFrame = spark.read
      .format("csv")
      .option("header"."true")
      .schema(schema)
      .load(path)
    dataDF
  }

Copy the code

Then came the main part. Write SQL statements, in the big data industry know how to write SQL is equal to 80%

WITH ratings_filter_cnt AS ( SELECT movieId, count( * ) AS rating_cnt,
     Round(avg( rating ),2) AS avg_rating
FROM
     ratings
GROUP BY
     movieId
HAVING
     count(*) >= 50
),
ratings_filter_score AS SELECT movieId, -- movieId rating_cnt, Avg_rating -- Movie rating FROM ratings_filter_cnt ORDER BY avg_rating DESC -- ORDER BY avg_rating DESC LIMIT 110-- Top 10 movies with high average scores)
SELECT
    m.movieId,
    r.rating_cnt AS ratingNum,
    m.title,
    r.avg_rating AS avgRating
FROM
   ratings_filter_score r
JOIN movies m ON m.movieId = r.movieId ORDER BY r.avg_rating DESC
Copy the code

WITH XXX AS SELECT

Finally save write mysql table

def saveToMysql(reportDF: DataFrame) = { // TODO: Use SparkSQL to provide built-in Jdbc data source to save reportdf.coalesce (1). Write // Add mode to add data to MySQL table, run again, primary key exists //.mode(savemode.overwrite).format(savemode.overwrite).mode(savemode.overwrite).format("jdbc")
      .option("driver"."com.mysql.jdbc.Driver")
      .option("url"."JDBC: mysql: / / 192.168.88.100:3306 / db_movies? serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
      .option("user"."root")
      .option("password"."123456")
      .option("dbtable"."db_movies.ten_most_rated_films")
      .save()
  }
Copy the code

SQL for two additional requirements:

// Request 2: Find every movie category and its average score WITH explode_movies AS (SELECT movieId, title, category FROM movies lateral VIEW explode ( split ( genres,"\ \ |" ) ) temp AS category  //爆炸函数拆一下| 
)
SELECT
 m.category AS genres,
 Round(avg( r.rating ),2) AS avgRating
FROM
 explode_movies m
 JOIN ratings r ON m.movieId = r.movieId
GROUP BY
 m.category
ORDER BY avgRating DESC
   // 需求3:查找被评分次数较多的前十部电影
WITH rating_group AS (
    SELECT
       movieId,
       count( * ) AS ratingCnt
    FROM ratings
    GROUP BY movieId
),
rating_filter AS (
    SELECT
       movieId,
       ratingCnt
    FROM rating_group
    ORDER BY ratingCnt DESC
    LIMIT 10
)
SELECT
    m.movieId,
    m.title,
    r.ratingCnt
FROM
    rating_filter r
JOIN movies m ON r.movieId = m.movieId ORDER BY r.ratingCnt DESC
Copy the code

conclusion

The above is the second rewrite of Spark movie rating data analysis, which is more complex than the previous SQL and has more requirements. I hope that the exam will pass smoothly tonight. @ ~ @ If you need the complete version of the code, please send me a private message to get it

May you have your own harvest after reading, if there is a harvest might as well a key three even once ~