MLlib is Apache Spark’s extensible machine learning library, designed to simplify the engineering practice of machine learning and facilitate scaling to larger data sets.
Introduction to Machine Learning
Before diving into Spark MLlib, learn about machine learning. According to Wikipedia, machine learning has the following definitions:
Machine learning is a science of artificial intelligence. The main research object of this field is artificial intelligence, especially how to improve the performance of specific algorithms in empirical learning.
Machine learning is the study of computer algorithms that can be improved automatically through experience;
Machine learning is the use of data or past experience to optimize a computer program’s performance criteria;
An oft-cited English definition is “A computer program is said to learn from experience E with respect to some class of tasks T and performance Measure P, if its performance at tasks in T, as measured by P, improves with experience.
Machine learning refers to statistical machine learning in a narrow sense. Statistical learning can be divided into supervised learning, semi-supervised learning, unsupervised learning, enhanced learning, etc., according to task types.
Commonly used algorithms for machine learning can be divided into the following categories:
1. Theoretical distribution of structural interval:Artificial neural network, decision tree, Perceptron, Support vector machine, integrated learning AdaBoost, dimensionality reduction and metric learning, clustering, Bayesian classifier;
2. Structural conditional probability:
Gauss process regression, linear discriminant analysis, nearest neighbor method, radial basis function kernel;
3. Construct probability density function through regeneration model:
Generative Topographic Mapping, maximum expectation algorithm, probabilistic graph model (Bayesian network and Markov random field), Generative Topographic Mapping;
4. Approximate inference technique:
Markov chain, Monte Carlo method, variational method;
Optimization algorithm.
Spark MLlib
As mentioned above, one of the key points of machine learning is “experience”. For a computer, experience usually requires several rounds of iterative calculation. Spark is good at iterative calculation, which fits the characteristics of machine learning. Spark’s official website shows a performance comparison of the logistic regression algorithm running on Spark and Hadoop. The following figure shows that MLlib is 100 times faster than MapReduce.
Spark MLlib includes the following contents:
Learning algorithm: classification, regression, clustering and collaborative filtering;
Feature processing: feature extraction, transformation, reduction and selection;
Pipelines: tools for building, evaluating, and tuning machine learning pipelines;
Persistence: Save and load algorithms, models, and pipes;
Practical tools: linear algebra, statistics, optimization, tuning and other tools.
The above table summarizes the support structure of Spark MLlib. As you can see, the algorithms it provides are rich, but the variety of algorithms is small and old. Therefore, Spark MLlib’s algorithmic support is somewhat out of step with kylin project, and its main functions are more feature related.
ML Pipelines
From Spark 2.0, RDD-based apis enter maintenance mode. Spark’s main machine learning API is now based on DataFrame API, which provides Pipeline suite based on sciKit-learn design. To build machine learning workflows. ML Pipelines provides a unified advanced API built on DataFrame to help users create and tune practical machine learning processes.
* “Spark ML” is not an official name and is occasionally used to refer to apis based on MLlib DataFrame
First understand several important components in ML Pipelines.
DataFrame gives Spark the ability to handle large-scale structured data.
An RDD is a collection of distributed Java objects whose internal data structure is unknown to RDD. DataFrame is a distributed data set based on RDD. Row objects are stored in RDD. Row objects provide detailed structural information, namely schemas, enabling DataFrame to structure data.
Transformer is usually a data/feature transformation class, or a trained model.
Every Transformer has a transform function that converts one DataFrame to another. Transformer. Transform is also executed lazily, generating new DataFrame variables and not submitting a job to calculate the contents of the DataFrame.
Estimators abstract the process of learning models from input data. Each Estimator implements FIT method, which generates a Transformer (the trained model) after given DataFrame and Params. Every time () is called, job is generated to train the model and get the model parameters.
Model parameters can be set either by setting parameters for a Transformer or Estimator instance, or by passing in a ParamMap object.
A Pipeline defines a set of data processing processes that can include Transformer, Estimator, or another Pipeline. The Pipeline inherits from Estimator and returns a Transformer — PipelineModel when the PipelineModel method is called. PipelineModel, a PipelineModel extension, is used to transform the input from Transformer to the final output of each Transformer in a Pipeline.
The typical Spark MLlib process is as follows:
Construct the training data set
Build each Stage
Stage 组成 Pipeline
Priming model training
Evaluation model effect
Calculate the predicted result
To deepen your understanding, consider an example of Pipeline text classification:
import{Pipeline, PipelineModel} import import{HashingTF, Tokenizer} import import org.apache.spark.sql.Row // Prepare training documents from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq( (0L,"a b c d e spark"1 l, 1.0), (,"b d", 0.0), (2 l,"spark f g h", 1.0), (3 l,"hadoop mapreduce", 0.0))). ToDF ("id"."text"."label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
val hashingTF = new HashingTF()
.setOutputCol("features") val lr = new LogisticRegression().setMaxiter (10).setregParam (0.001) val pipeline = new pipeline () .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. val model = // Now we can optionally save the fitted pipeline to disk model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
// Make predictions on test documents.
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id.$text) --> prob=$prob, prediction=$prediction")}Copy the code
Model selection and parameter tuning
Spark MLlib provides CrossValidator and TrainValidationSplit for model selection and parameter tuning. The three basic components of model selection and parameter tuning are Estimator, ParamGrid and Evaluator. Estimator includes algorithm or Pipeline. ParamGrid is the ParamMap set, which provides search space for parameters. Evaluator is an evaluation index.
The CrossValidator divides the data set into N pieces according to the number of cross-validation. Each time, n-1 pieces are used as the training set, and the rest are used as the test set. The model is trained and evaluated for n times to obtain n evaluation results. Then repeat the above process for each candidate ParamMap, select the optimal ParamMap and retrain the model to get the model output of the optimal parameters.
for example:
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings forCrossValidator to choose from. val paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, AddGrid (lr.regparam, Array(0.1, 0.01)). Build () // We now treat the Pipeline as an Estimator, wrapping itin a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
.setEvaluator(new BinaryClassificationEvaluator)
.setNumFolds(2) // Use 3+ in practice
.setParallelism(2) // Evaluate up to 2 parameter settings in parallel
// Run cross-validation, and choose the best set of parameters.
val cvModel =
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
// Make predictions on test documents. cvModel uses the best model found (lrModel).
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id.$text) --> prob=$prob, prediction=$prediction")}Copy the code
TrainValidationSplit The trainRatio parameter is used to cut the training set into training and verification set according to the proportion, in which the trainRatio sample is used for training and the remaining sample is used for verification.
Unlike CrossValidator, TrainValidationSplit has only one validation process and can simply be viewed as a special version of CrossValidator with n = 2.
for example:
import{ParamGridBuilder, TrainValidationSplit}
// Prepare training and test data.
val data ="libsvm").load("data/mllib/sample_linear_regression_data.txt")
val Array(training, test) = data. RandomSplit (Array (0.9, 0.1), seed = 12345) val lr = new LinearRegression() .setMaxIter(10) // We use a ParamGridBuilder to construct a grid of parameters to search over. // TrainValidationSplit will try all combinations of values and determine best model using // Evaluator.val paramGrid = new ParamGridBuilder().addGrid(l.regparam, Array(0.1, AddGrid (lr.elasticNetParam, Array(0.0, 0.5, 1.0)).build() // In thiscase the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
.setEvaluator(new RegressionEvaluator)
// 80% of the data will be used for training and the remaining 20% forValidation.. setTrainRatio(0.8) // Evaluate up to 2 parameter Settingsin parallel
// Run train validation split, and choose the best set of parameters.
val model =
// Make predictions on test data. model is the model with combination of parameters
// that performed best.
.show()Copy the code
Implement custom Transformer
Implements the transform method, inherited from the Transformer class, typically by adding one or more columns to the input DataFrame.
For single-input columns, a single-output Transformer can inherit from the UnaryTransformer class and implement the createTransformFunc method that handles each row of the input column and returns the corresponding output.
Self-developed machine learning framework
Machine learning technology changes with each passing day, but there is a lack of efficient and flexible framework to reduce the research cost of new technology. Experience and technology often need to be deposited through frameworks and tools. Moreover, algorithm personnel are often limited by computing power, leading to effective models proved offline, which cannot be put online due to the high complexity of estimated time.
Data on the basis of the beautiful technical team with “development of simple and flexible workflow, machine learning algorithm of personnel of the new algorithm research cost and engineering personnel maintenance costs, and provide the commonly used field solution, precipitation will experience” the goal of building a set of tailor-made machine learning framework is used to solve the above problem, Especially to solve the problems encountered in the recommendation algorithm related tasks. The framework consists of three components: Spark Feature, Bamboo and Online Scorer.
Spark Feature: Training sample production
The component is mainly used for the production of training samples, realizing flexible and efficient sample feature coding. Any feature set can be encoded in the same space, and different feature sets share the coding space. Two concepts have been proposed for this purpose: the first is “domain,” which defines a set of characteristics that share the same modeling process; The second is “space,” which defines a set of fields that share the same coding space.
“Old” in the example above shows the sample feature coding without the concepts of “domain” and “space”. All features are numbered from 1. “New” shows that after age and gender are placed in the age domain and gender domain respectively, the codes of the two fields start from 1 and do not affect each other.
The Spark Feature finally adopts TFRecords as the storage format of training samples.
Bamboo: Model definition and training
The main purpose of this component is to achieve scalable, efficient, simple and fast model definition and training. To this end, we followed the following principles when designing Bamboo:
The layers interact with each other through tensor. Your input is tensor, your output is tensor.
2. In order to maximize the offline and online efficiency, there are not too many advanced apis, such as KerAS. Most of the models and components are developed based on the underlying API of Tensorflow, and the code is optimized according to the official performance optimization guide of Tensorflow;
3. An online-offline modeling framework is provided, which allows complex calculations to be put offline and only lightweight calculations to be carried out online, making complex models easier to be put online;
4. Encapsulation data loading, model training and export, effect evaluation and various auxiliary tools are provided. Users only need to define the forward inference network, and encapsulate a large number of common layer at the same time, which makes the model definition faster.
Online Scorer: Online forecasting service
The goal of Online Scorer is to provide a unified, efficient Online inference service that supports models derived from tensorFlow, PyTorch, XGBoost and other major modeling frameworks. At present, this work is still in progress. The details of the specific implementation scheme will be introduced in the following special articles.
The above is a brief introduction of meitu’s self-developed machine learning framework. Welcome to continue to follow the “Meitu Data Technology team”, and we will bring a detailed introduction of the platform later.