Tutorial directory
- 0x00 Tutorial content
- 0x01 Data Preparation
-
-
-
-
- 1. Download data
- 2. Upload data to the HDFS
- 3. Data field description
-
-
-
- 0x02 Project implementation
-
-
-
-
- 1. Dependency preparation
- 2. Upload a copy of data to the local PC
- 3. Data processing code implementation
- 4. Display of execution effect
-
-
-
- 0x03 Project description
-
-
-
-
- 1. Overall introduction of the project
- 2. Use Hadoop and Spark for preprocessing
- 3. Use Spark and ML-lib for modeling
-
-
-
- 0x04 Package to server for execution
- 0x05 Project Upgrade
- 0 XFF summary
0x00 Tutorial content
- Data preparation
- Project implementation
- The project on
- The project to upgrade
PS: It will be added later: 1. Upgrade Spark and Scala; 2. Continue to optimize the data to improve the prediction effect; 3
0x01 Data Preparation
1. Download data
A. wget reference commands:
wget http://stat-computing.org/dataexpo/2009/2007.csv.bz2 -O /tmp/flights_2007.csv.bz2
wget http://stat-computing.org/dataexpo/2009/2008.csv.bz2 -O /tmp/flights_2008.csv.bz2
Copy the code
B. Please change the name by yourself. If it is not downloaded by WGET:
flights_2007.csv.bz2
flights_2008.csv.bz2
2. Upload data to the HDFS
A. Upload the software to the/TMP /airflightsdelays/ path of the HDFS
3. Data field description
A. Uploaded to the HDFS/tmp/airflightsdelays/
Under the path
Note: you can decompress the data, check the first few data (data information to be improved and verified! .
0x02 Project implementation
1. Dependency preparation
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shaonaiyi</groupId>
<artifactId>sparkMLlib</artifactId>
<version>1.0 the SNAPSHOT</version>
<properties>
<spark.version>1.6.3</spark.version>
<scala.version>2.10.5</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - core_2. 11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - sql_2. 11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - mllib_2. 11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<! --<plugin>-->
<! --<groupId>org.scala-tools</groupId>-->
<! --<artifactId>maven-scala-plugin</artifactId>-->
<! --<executions>-->
<! --<execution>-->
<! --<goals>-->
<! --<goal>compile</goal>-->
<! --<goal>testCompile</goal>-->
<! --</goals>-->
<! --</execution>-->
<! --</executions>-->
<! --<configuration>-->
<! --<scalaVersion>${scala.version}</scalaVersion>-->
<! --<args>-->
<! - < arg > - the target: the JVM 1.7 < / arg > -- >
<! --</args>-->
<! --</configuration>-->
<! --</plugin>-->
</plugins>
</build>
</project>
Copy the code
2. Upload a copy of data to the local PC
A. Of the project root path/tmp/airflightsdelays/
:
3. Data processing code implementation
package com.shaonaiyi
import org.apache.spark.rdd._
import scala.collection.JavaConverters._
import au.com.bytecode.opencsv.CSVReader
import java.io._
import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, SVMWithSGD}
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.{DecisionTree, RandomForest}
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.format.DateTimeFormat
import org.joda.time.DateTime
import org.joda.time.Days
/ * * *@Auther: Nai Yi Shao *@Date: 2019/05/06 下午 3:08
* @Description: Aircraft delay prediction project */
object DelayRecProject {
def main(args: Array[String]): Unit = {
Comment out the local test code when packing to the cluster
val conf = new SparkConf().setMaster("local[5]").setAppName("DelayRecProject")
// val conf = new SparkConf()
val sc = new SparkContext(conf)
// Set the log print level
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Phase 1: data preprocessing
Comment out the local test code when packing to the cluster
val data_2007tmp = prepFlightDelays("tmp/airflightsdelays/flights_2007.csv.bz2",sc)
// val data_2007tmp = prepFlightDelays("/tmp/airflightsdelays/flights_2007.csv.bz2",sc)
val data_2007 = data_2007tmp.map(rec => rec.gen_features._2)
Comment out the local test code when packing to the cluster
val data_2008 = prepFlightDelays("tmp/airflightsdelays/flights_2008.csv.bz2",sc).map(rec => rec.gen_features._2)
// val data_2008 = prepFlightDelays("/tmp/airflightsdelays/flights_2008.csv.bz2",sc).map(rec => rec.gen_features._2)
data_2007tmp.toDF().registerTempTable("data_2007tmp")
data_2007.take(5).map(x => x mkString ",").foreach(println)
// Phase 2: Modeling using Spark and ML-lib
// Prepare training set
val parsedTrainData = data_2007.map(parseData)
parsedTrainData.cache
val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features))
val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
scaledTrainData.cache
// Prepare test/validation set
val parsedTestData = data_2008.map(parseData)
parsedTestData.cache
val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray))))
scaledTestData.cache
scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println)
// Stage 3: Evaluate the classification indicators
// Function to compute evaluation metrics
def eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = {
val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDouble
val tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDouble
val fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDouble
val fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDouble
val precision = tp / (tp+fp)
val recall = tp / (tp+fn)
val F_measure = 2*precision*recall / (precision+recall)
val accuracy = (tp+tn) / (tp+tn+fp+fn)
new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy))
}
class Metrics(labelsAndPreds: RDD[(Double.Double)]) extends java.io.Serializable {
private def filterCount(lftBnd:Int,rtBnd:Int):Double = labelsAndPreds
.map(x => (x._1.toInt, x._2.toInt))
.filter(_ == (lftBnd,rtBnd)).count()
lazy val tp = filterCount(1.1) // true positives
lazy val tn = filterCount(0.0) // true negatives
lazy val fp = filterCount(0.1) // false positives
lazy val fn = filterCount(1.0) // false negatives
lazy val precision = tp / (tp+fp)
lazy val recall = tp / (tp+fn)
lazy val F1 = 2*precision*recall / (precision+recall)
lazy val accuracy = (tp+tn) / (tp+tn+fp+fn)
}
// Stage 4: Build regression model
// Build the Logistic Regression model
val model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100)
// Predict
val labelsAndPreds_lr = scaledTestData.map { point =>
val pred = model_lr.predict(point.features)
(pred, point.label)
}
val m_lr = eval_metrics(labelsAndPreds_lr)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0).m_lr(1).m_lr(2).m_lr(3)))
println(model_lr.weights)
// Stage 5: Construct vector machine algorithm model
// Build the SVM model
val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(100)
.setRegParam(1.0)
.setStepSize(1.0)
val model_svm = svmAlg.run(scaledTrainData)
// Predict
val labelsAndPreds_svm = scaledTestData.map { point =>
val pred = model_svm.predict(point.features)
(pred, point.label)
}
val m_svm = eval_metrics(labelsAndPreds_svm)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0).m_svm(1).m_svm(2).m_svm(3)))
// Stage 6: Build decision tree algorithm model
// Build the Decision Tree model
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 10
val maxBins = 100
val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)
// Predict
val labelsAndPreds_dt = parsedTestData.map { point =>
val pred = model_dt.predict(point.features)
(pred, point.label)
}
val m_dt = eval_metrics(labelsAndPreds_dt)._2
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0).m_dt(1).m_dt(2).m_dt(3)))
// Stage 7: Build the stochastic forest algorithm model
val treeStrategy = Strategy.defaultStrategy("Classification")
val numTrees = 100
val featureSubsetStrategy = "auto" // Let the algorithm choose
val model_rf = RandomForest.trainClassifier(parsedTrainData, treeStrategy, numTrees, featureSubsetStrategy, seed = 123)
// Predict
val labelsAndPreds_rf = parsedTestData.map { point =>
val pred = model_rf.predict(point.features)
(point.label, pred)
}
val m_rf = new Metrics(labelsAndPreds_rf)
println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f"
.format(m_rf.precision, m_rf.recall, m_rf.F1, m_rf.accuracy))
}
case class DelayRec(year: String, month: String, dayOfMonth: String, dayOfWeek: String, crsDepTime: String, depDelay: String, origin: String, distance: String, cancelled: String) {
val holidays = List("01/01/2007"."01/15/2007"."02/19/2007"."05/28/2007"."06/07/2007"."07/04/2007"."09/03/2007"."10/08/2007" ,"11/11/2007"."11/22/2007"."12/25/2007"."01/01/2008"."01/21/2008"."02/18/2008"."05/22/2008"."05/26/2008"."07/04/2008"."09/01/2008"."10/13/2008" ,"11/11/2008"."11/27/2008"."12/25/2008")
def gen_features: (String, Array[Double]) = {
val values = Array(
depDelay.toDouble,
month.toDouble,
dayOfMonth.toDouble,
dayOfWeek.toDouble,
get_hour(crsDepTime).toDouble,
distance.toDouble,
days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt)
)
new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values)
}
def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2)
def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day)
def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = {
val sampleDate = new DateTime(year, month, day, 0.0)
holidays.foldLeft(3000) { (r, c) =>
val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c)
val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays)
math.min(r, distance)
}
}
}
def prepFlightDelays(infile: String, sc: SparkContext): RDD[DelayRec] = {
val data = sc.textFile(infile)
data.map { line =>
val reader = new CSVReader(new StringReader(line))
reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21)))
}.map(list => list(0)) .filter(rec => rec.year ! ="Year")
.filter(rec => rec.cancelled == "0")
.filter(rec => rec.origin == "ORD")}def parseData(vals: Array[Double]): LabeledPoint = {
LabeledPoint(if (vals(0) > =15) 1.0 else 0.0, Vectors.dense(vals.drop(1)))}}Copy the code
4. Display of execution effect
A. perform:
0x03 Project description
1. Overall introduction of the project
In this project, we will demonstrate how to build a prediction model using Hadoop, this time using Apache Spark and ML-lib.
The tutorial uses Apache Spark to generate our feature matrix through its Scala API, and uses ML-lib (Spark’s machine learning library) to build and evaluate our classification model.
To build a predictive model of flight delays, the source data set is based on the data we downloaded, which includes detailed information on FLIGHTS in the United States between 1987 and 2008. A wealth of weather information will be added later, including daily temperature (min/Max), wind speed, snow conditions and precipitation.
We build a supervised learning model to predict flight delays leaving O ‘Hare International Airport (ORD). Finally, we will build the model using data from 2007 and test its validity using data from 2008.
2. Use Hadoop and Spark for preprocessing
The basic data abstraction of Apache Spark is RDD (Elastic Distributed Data Set), which is a fault-tolerant collection of elements that can be run in parallel in a Hadoop cluster.
Spark’s API (provided in Scala, Python, or Java) supports various transformations, such as map() and flatMap(), filter(), Join (), etc., to create and manipulate RDD. A complete description about the API, please see the Spark API programming guide: spark.apache.org/docs/1.6.3/…
Similar to the SciKit-Learn demo, in our first iteration we generated the following functionality for each flight:
Day of Month: This may not be a very predictive variable, but use it anyway: Distance: It will be interesting to see if this variable is a good predictor of delay. Number of days to the nearest US holiday
We will perform the same pre-processing using Spark RDD, converting the original flight delay data set into two feature matrices: DATa_2007 (our training set) and datA_2008 (our test set). The Case class DelayRec, which encapsulates flight delay records, represents feature vectors and its methods do most of the heavy lifting:
To_date () is an auxiliary method for converting year/month/day to a string. Gen_features (row) takes a single line of input and generates a key/value tuple where the key is the date string (the output of to_date) and the value is the eigenvalue. We will use it in the second iteration to connect with weather data. The days_from_nearest_holiday() method computes the minimum distance (in days) in year/month/day provided by any holiday in the list holidays.
With DelayRec, our processing will perform the following steps (in the function prepFlightDelays) :
1. We use Spark’s sparkContext. textFile method to read the original input file to generate the RDD. Each row is parsed as a field using CSVReader and populated into the DelayRec object. 3. Then we perform a series of RDD conversions on the input RDD to ensure that we only have rows corresponding to flights that have not been cancelled and are derived from ORD. 4. Finally, we use the gen_features method to generate the final feature vectors for each row as a set of doubles.
3. Use Spark and ML-lib for modeling
Data_2007 datasets (for training) and datA_2008 datasets (for validation) were used as RDD, and then Spark’s ML-lib machine learning library was used to build the prediction model.
Ml-lib is Spark’s extensible machine learning library, including a variety of learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and more.
To use mL-lib’s machine learning algorithm, we first parse our eigenmatrix into the RDD of LabeledPoint objects (for training and test data sets). LabeledPoint is an ML-lib abstraction of labeled eigenvectors.
We treat flight delays of 15 minutes or more as “delayed” and mark them as 1.0, and within 15 minutes as “non-delayed” and mark them as 0.0.
We also use mL-lib’s StandardScaler class to standardize the eigenvalues of the training and validation sets. This is important because ML-lib uses stochastic gradient descent, which behaves best if the eigenvectors are normalized.
0x04 Package to server for execution
Comment out the corresponding code: spark-submit –master yarn –class com.shainaiyi.DelayRecProject –name DelayRecProject Sparkmllib-1.0-snapshot. jar /home/hadoop-sny/ sparkmllib-1.0-snapshot. jar /home/hadoop-sny/ sparkmllib-1.0-snapshot. jar
0x05 Project Upgrade
To supplement!
0 XFF summary
- Through this experiment, we integrated the previously learned knowledge, build a Maven project, for example, the IDEA of Scala code, packaging to the server, the local test code, machine learning model operation, check and so on operation result, study will be full of harvest, please see more knowledge, the lines, to practice.
- In the process of execution, if it is found that the running is very slow, you can consider reducing the amount of data, and the corresponding accuracy will also be reduced. In addition, when running a job, you can try to comment out the code that does not need to be executed, which can accelerate the efficiency of the execution of the job.
About the author: Big data lecturer, university market insight, column editor, microblog, CSDN: Shao Naiyi