Developers don’t understand the algorithm of data is not a good algorithm engineer, remember, a graduate student, teacher talked about some of the data mining algorithm, an interest, but after work contact less, data engineer despise chains, model > > > > offline for real-time storehouse the ETL engineer BI engineer (don’t like do not spray), now work mainly is offline for several positions, Of course, I have also done some ETL work in the early stage. In order to develop my career in the long term and broaden my technical boundaries, it is necessary to gradually deepen the real-time and model. Therefore, from the beginning of this article, I will set up a FLAG and further study the real-time and model part.

To change yourself, start by improving the things you are not good at.

1. Introduction to KMeans algorithm

K-means algorithm is an unsupervised clustering algorithm, which is relatively simple to implement and has good clustering effect, so it is widely used.

  • K-means algorithm, also known as k-mean or K-mean, is generally used as the first algorithm to master clustering algorithm.

  • K here is a constant, which needs to be set in advance. Popularly speaking, this algorithm aggregates M samples without labels into K clusters by iteration.

  • In the process of sample aggregation, the distance between samples is often used as an indicator to divide.

The core: K-means clustering algorithm is a cluster analysis algorithm for iterative solution. Its procedure is to randomly select K objects as the initial cluster center, then calculate the distance between each object and each seed cluster center, and assign each object to the cluster center nearest to it. Cluster centers and the objects assigned to them represent a cluster. Each time a sample is assigned, the cluster center of the cluster is recalculated according to the existing objects in the cluster. This process is repeated until some termination condition is met. Termination conditions can be that no (or minimum number) objects are reassigned to different clusters, no (or minimum number) cluster center changes again, error square and local minimum

2.KMeans algorithm process

2.1 Read files, prepare data, and preprocess data

2.2 Randomly find K points as the initial center points

2.3 Through the data set, calculate the distance from each point to the three centers, and the center closest to that point will belong to the center point

2.4 Calculate the new center point according to the new classification

2.5 Start the next loop with the new center point (continue the loop step 2.3)

Conditions for exiting the loop:

1. Specify the number of cycles

2. All the center points hardly move any more (i.e. the total distance of the center points is less than that of a given constant, say 0.00001)

3. Advantages and disadvantages of KMeans algorithm

The choice of K value: the value of K is crucial to the final result, but it must be given in advance. Given an appropriate k value, prior knowledge is required, and it is difficult to estimate it out of thin air, or it may lead to poor results.

Existence of outliers: K-means algorithm uses the mean of all points as the new particle (center point) in the process of iteration. If there are outliers in the cluster, the mean deviation will be serious. For example, if there are 5 data in a cluster, such as 2, 4, 6, 8 and 100, then the new particle is 24, which is obviously far away from most of the points. In the current case, it is probably better to use the median of 6 than the idea of using the mean, which is called k-Mediods clustering (K-median clustering)

Initial value sensitive: K-means algorithm is sensitive to initial values, and selecting different initial values may lead to different clustering rules. In order to avoid the final result anomaly caused by such sensitivity, different classification rules can be constructed by initializing multiple sets of initial nodes, and then the optimal construction rules can be selected. Back about this so is derived: binary K – Means algorithm, K – Means++ algorithm, the K – Means | | algorithm, the Canopy algorithm, etc

It is one of the most commonly used algorithms in clustering due to its advantages of simple implementation, good mobility and scalability.

4. Spark implementation of KMeans algorithm

4.1 Data Download and Description

Link: pan.baidu.com/s/1FmFxSrPI… Extract code: hell copy this section of content after opening Baidu web disk mobile App, more convenient operation oh

Iris data set: The data set contains 150 data in 3 categories, each category contains 50 data, and each record contains 4 features: calyx length, calyx width, petal length and petal width

Based on these four features, cluster the flowers, assuming that K is 3, and see the difference with the actual result

4.2 implementation

Instead of using the MLB library, the Scala native implementation is used

package com.hoult.work

import org.apache.commons.lang3.math.NumberUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer
import scala.math.{pow, sqrt}
import scala.util.Random

object KmeansDemo {

  def main(args: Array[String) :Unit = {

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()

    val sc = spark.sparkContext
    val dataset = spark.read.textFile("data/lris.csv")
      .rdd.map(_.split(",").filter(NumberUtils.isNumber _).map(_.toDouble)) .filter(! _.isEmpty).map(_.toSeq)val res: RDD[(Seq[Double].Int)] = train(dataset, 3)

    res.sample(false.0.1.1234L)
      .map(tp => (tp._1.mkString(","), tp._2))
      .foreach(println)
  }

  // To define a method, the parameters passed in are data set, K, the maximum number of iterations, and the cost function change threshold
  // The maximum number of iterations and cost function change threshold are set to default values, which can be changed according to needs
  def train(data: RDD[Seq[Double]], k: Int, maxIter: Int = 40, tol: Double = 1e-4) = {

    val sc: SparkContext = data.sparkContext

    var i = 0 // Number of iterations
    var cost = 0D // The initial cost function
    var convergence = false   // Determine convergence, that is, the cost function change is less than the threshold tol

    // Step1: Select k initial clustering centers randomly
    var initk: Array[(Seq[Double].Int)] = data.takeSample(false, k, Random.nextLong()).zip(Range(0, k))

    var res: RDD[(Seq[Double].Int)] = null

    while(i < maxIter && ! convergence) {val bcCenters = sc.broadcast(initk)

      val centers: Array[(Seq[Double].Int)] = bcCenters.value

      val clustered: RDD[(Int, (Double.Seq[Double].Int))] = data.mapPartitions(points => {

        val listBuffer = new ListBuffer[(Int, (Double.Seq[Double].Int))] ()// Calculate the distance between each sample point and each cluster center
        points.foreach { point =>

          // Calculate the cluster ID and the sum of squares of minimum distance, sample point, 1
          val cost: (Int, (Double.Seq[Double].Int)) = centers.map(ct => {

            ct._2 -> (getDistance(ct._1.toArray, point.toArray), point, 1)

          }).minBy(_._2._1)  // Assign the sample to the nearest cluster center
          listBuffer.append(cost)
        }

        listBuffer.toIterator
      })
      //
      val mpartition: Array[(Int, (Double.Seq[Double]))] = clustered
        .reduceByKey((a, b) => {
          val cost = a._1 + b._1   // Cost function
          val count = a._3 + b._3   // The number of samples for each class is accumulated
          val newCenters = a._2.zip(b._2).map(tp => tp._1 + tp._2)    // New cluster center set
          (cost, newCenters, count)
        })
        .map {
          case (clusterId, (costs, point, count)) =>
            clusterId -> (costs, point.map(_ / count))   // New clustering center
        }
        .collect()
      val newCost = mpartition.map(_._2._1).sum   // Cost function
      convergence =  math.abs(newCost - cost) <= tol    // Determine convergence, that is, whether the change of cost function is less than or less than the threshold toL
      // Transform the new cost function
      cost = newCost
      // Transform the initial cluster center
      initk = mpartition.map(tp => (tp._2._2, tp._1))
      // The clustering result returns the sample point and the id of the owning class
      res = clustered.map(tp=>(tp._2._2,tp._1))
      i += 1
    }
    // return the clustering result
    res
  }

  def getDistance(x:Array[Double],y:Array[Double) :Double={
    sqrt(x.zip(y).map(z=>pow(z._1-z._2,2)).sum)
  }


}


Copy the code

Full code: github.com/hulichao/bi…

Result screenshot:Check your profile for more.