Spark Machine learning

  • PCA(Principal Component Analysis)
  • SVD(Singular Value Decomposition method, Singular Value Decomposition)

Vis-www.cs.umass.edu/lfw/lfw-a.t…

0 Operating Environment

Export SPARK_HOME = / Users/erichan/Garden/spark - 1.5.1 - bin - hadoop2.6 CD $SPARK_HOME bin/spark - shell -- the name my_mlib -- Packages org.jBLas: jBLas: 1.2.4-snapshot --driver-memory 4G -- Executor-memory 4G --driver-cores 2Copy the code

1. Extraction Features

1.1 Load face data

Val PATH = "/ Users/erichan sourcecode/book/Spark machine learning" val PATH = PATH + "/ LFW / *" val RDD. = sc wholeTextFiles val (PATH) files = rdd.map { case (fileName, content) => fileName.replace("file:", "") } println(files.count)Copy the code

1054

1.2 Visual Face Data (Python)

ipython -pylab
Copy the code
PATH = "/ Users/erichan sourcecode/book/Spark machine learning" PATH = PATH + "/ LFW Aaron_Eckhart/Aaron_Eckhart_0001. JPG" ae = imread(path) imshow(ae)Copy the code

tmpPath = "/tmp/aeGray.jpg"
aeGary = imread(tmpPath)
imshow(aeGary, cmap=plt.cm.gray)
Copy the code

1.3 Extracting face pictures as vectors

1.3.1 Loading images
import java.awt.image.BufferedImage
def loadImageFromFile(path: String): BufferedImage = {
    import javax.imageio.ImageIO
    import java.io.File
    ImageIO.read(new File(path))
}

val aePath = PATH+"/lfw/Aaron_Eckhart/Aaron_Eckhart_0001.jpg"
val aeImage = loadImageFromFile(aePath)
Copy the code
1.3.2 Convert gray scale and change size
def processImage(image: BufferedImage, width: Int, height: Int): BufferedImage = {
    val bwImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY)
    val g = bwImage.getGraphics()
    g.drawImage(image, 0, 0, width, height, null)
    g.dispose()
    bwImage
}

val grayImage = processImage(aeImage, 100, 100)

import javax.imageio.ImageIO
import java.io.File
ImageIO.write(grayImage, "jpg", new File("/tmp/aeGray.jpg"))
Copy the code

1.3.3 Feature vector extraction
def getPixelsFromImage(image: BufferedImage): Array[Double] = { val width = image.getWidth val height = image.getHeight val pixels = Array.ofDim[Double](width * // pixels. Map (p => p / 255.0) // optionally scale to [0, optionally scale 1] domain } // put all the functions together def extractPixels(path: String, width: Int, height: Int): Array[Double] = { val raw = loadImageFromFile(path) val processed = processImage(raw, width, height) getPixelsFromImage(processed) } val pixels = files.map(f => extractPixels(f, 50, 50)) println(pixels.take(10).map(_.take(10).mkString("", ",", ", ..." )).mkString("\n"))Copy the code

1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0,… 247.0, 173.0, 159.0, 144.0, 139.0, 155.0, 32.0, 7.0, 4.0, 5.0,… 253.0, 254.0, 253.0, 253.0, 253.0, 253.0, 253.0, 253.0, 253.0, 253.0,… 242.0, 242.0, 246.0, 239.0, 238.0, 239.0, 225.0, 165.0, 140.0, 167.0,… 47.0, 221.0, 205.0, 46.0, 41.0, 154.0, 127.0, 214.0, 232.0, 232.0,… 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,… 75.0, 76.0, 72.0, 72.0, 72.0, 74.0, 71.0, 78.0, 54.0, 26.0,… 25.0, 27.0, 24.0, 22.0, 26.0, 27.0, 19.0, 16.0, 22.0, 25.0,… 240.0, 240.0, 240.0, 240.0, 240.0, 240.0, 240.0, 240.0, 240.0, 240.0,… 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,…

import org.apache.spark.mllib.linalg.Vectors
val vectors = pixels.map(p => Vectors.dense(p))
vectors.setName("image-vectors")
vectors.cache
Copy the code

1.4 regularization

import org.apache.spark.mllib.feature.StandardScaler
val scaler = new StandardScaler(withMean = true, withStd = false).fit(vectors)

val scaledVectors = vectors.map(v => scaler.transform(v))
Copy the code

2. Train the dimensional reduction model

2.1 The first K principal components

import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val matrix = new RowMatrix(scaledVectors)
val K = 10
val pc = matrix.computePrincipalComponents(K)
val rows = pc.numRows
val cols = pc.numCols
println(rows, cols)
Copy the code

(2500, 10)

2.2 Visual feature face

import breeze.linalg.DenseMatrix
val pcBreeze = new DenseMatrix(rows, cols, pc.toArray)
import breeze.linalg.csvwrite
import java.io.File
csvwrite(new File("/tmp/pc.csv"), pcBreeze)
Copy the code
pc = np.loadtxt("/tmp/pc.csv", delimiter=",") print(pc.shape) def plot_gallery(images, h, w, n_row=2, n_col=5): """Helper function to plot a gallery of portraits""" PLt. figure(figsize=(1.8 * n_col, Subplots_adjust (bottom=0, left=.01, right=.99, top=.90, hspace=.35) for I in range(n_row * n_col): plt.subplot(n_row, n_col, i + 1) plt.imshow(images[:, i].reshape((h, w)), cmap=plt.cm.gray) plt.title("Eigenface %d" % (i + 1), size=12) plt.xticks(()) plt.yticks(()) plot_gallery(pc, 50, 50)Copy the code

3. Use a dimension reduction model

3.1 PCA projection (image matrix X principal component matrix)

val projected = matrix.multiply(pc)
println(projected.numRows, projected.numCols)
println(projected.rows.take(5).mkString("\n"))
Copy the code

3.2 PCA and SVD

val svd = matrix.computeSVD(10, computeU = true)
println(s"U dimension: (${svd.U.numRows}, ${svd.U.numCols})")
println(s"S dimension: (${svd.s.size}, )")
println(s"V dimension: (${svd.V.numRows}, ${svd.V.numCols})")
Copy the code

U dimension: (1054, 10)

S dimension: (10, )

V dimension: (2500, 10)

def approxEqual(array1: Array[Double], array2: Array[Double], tolerance: Double = 1e-6): Boolean = { // note we ignore sign of the principal component / singular vector elements val bools = array1.zip(array2).map { case (v1, v2) => if (math.abs(math.abs(v1) - math.abs(v2)) > 1e-6) false else true } bools.fold(true)(_ & _) } Println (approxEqual(Array(1.0, 2.0, 3.0), Array(1.0, 2.0, 3.0)) println(approxEqual(Array(1.0, 2.0, 3.0)) 2.0, 1.0))) println (approxEqual (SVD) V.t oArray, PC. The toArray))Copy the code

true

false

true

// compare projections
val breezeS = breeze.linalg.DenseVector(svd.s.toArray)
val projectedSVD = svd.U.rows.map { v =>
    val breezeV = breeze.linalg.DenseVector(v.toArray)
    val multV = breezeV :* breezeS
    Vectors.dense(multV.data)
}
projected.rows.zip(projectedSVD).map { case (v1, v2) => approxEqual(v1.toArray, v2.toArray) }.filter(b => true).count
Copy the code

4. Evaluate the dimension reduction model

4.1 Evaluate the K value of SVD

val sValues = (1 to 5).map { i => matrix.computeSVD(i, computeU = false).s }
val svd300 = matrix.computeSVD(300, computeU = false)
val sMatrix = new DenseMatrix(1, 300, svd300.s.toArray)
csvwrite(new File("/tmp/s.csv"), sMatrix)
Copy the code
s = np.loadtxt("/tmp/s.csv", delimiter=",")
print(s.shape)
plot(s)
Copy the code

plot(cumsum(s))
plt.yscale('log')
Copy the code