This article has been published by netease Cloud community authorized by the author Ye Linbao.

Welcome to visit netease Cloud Community to learn more about Netease’s technical product operation experience.


Solution 4: Sort on Cell Values

Description:

In scheme 3 above, oom may still occur when the number of data rows is large, and the data of different field_indexes may shuffle into the same partition, thus increasing the probability of OOM. One solution is to increase the number of partitions when field_index itself has a large number of values. However, if the field_index value is less than the number of partitions, increasing the number of partitions has no effect on oom relief. If field_value is more scattered and has more values than field_index, think again and partition field_value by field_value. The specific algorithm is as follows:

Algorithm:

1) convert df to (field_value, field_index)

(2) Sort data by field_value with sortByKey (rangPartition)

(3) mapPartitions are used to determine how much data there is for each field_index in each partition (filed_value in different partitions is relatively orderly, For example, filed_value in partiiton1 is smaller than field_value in partition2)

(4) Using the data in step (3), determine which partition and the number of data items in each field_index are required for ranking. For example, to output the 13th bit of field_index_6, the target is the third row of the second partition, assuming that the first partition already contains 10 rows

(5) Conversion (4) Calculation results into standard output format

Code:

(1)

/** * Convert data source df to RDD * @param dataFrame * @ in (field_value, field_index) formatreturn
    */
  def getValueColumnPairs(dataFrame : DataFrame): RDD[(Double, Int)] =
  {
    dataFrame.rdd.flatMap{
      row: Row => row.toSeq.zipWithIndex
        .map{case (v, index) => (v.toString.toDouble, index)}
    }
  }Copy the code

(3)

/** * sortedValueColumnPairs for each partition How much data does each field_index have * @param sortedValueColumnPairs * @param numOfColumns * @return
    */
  def getColumnsFreqPerPartition(sortedValueColumnPairs: RDD[(Double, Int)],numOfColumns : Int): Array[(Int, Array[Long])] = {
    val zero = Array.fill[Long](numOfColumns)(0)    def aggregateColumnFrequencies (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) = {
      val columnsFreq : Array[Long] = valueColumnPairs.aggregate(zero)(
        (a : Array[Long], v : (Double, Int)) => {
          val (value, colIndex) = v          //increment the cell in the zero array corresponding to this column
          a(colIndex) = a(colIndex) + 1L
          a
        },
        (a : Array[Long], b : Array[Long]) => {
          a.zip(b).map{ case(aVal, bVal) => aVal + bVal}
        })
      Iterator((partitionIndex, columnsFreq))
    }
    sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()
  }Copy the code

For example:

Suppose that for the transformed data in (1), the data for each partition is sorted by field_value as follows

Partition 1: (1.5, 0), (1.75, 1), (2.0, 2), (5.25, 0)

Partition 2: (7.5, 1) (9.5, 2)

Then the output result of (2) is:

[(0, [2, 1, 1]), (1, [0, 1, 1])]

(4)

/** * Calculate the number of rows in which partitions each field_index needs to be ranked * @param targetRanks ranked array * @param partitionColumnsFreq How much data each field_index in each partition contains * @param numOfColumns Field number * @return*/ def getRanksLocationsWithinEachPart(targetRanks : List[Long], partitionColumnsFreq : Array[(Int, Array[Long])], numOfColumns : Int) : Array[(Int, List[(Int, Long)])] = {Array[(Int, List[(Int, Long)]] = { Val runningTotal = array. fill[Long](numOfColumns)(0) // The partition indices are not necessarilyin sorted order, so we need
    // to sort the partitionsColumnsFreq array by the partition index (the
    // first value inThe tuple). PartitionColumnsFreq. SortBy (_) _1). The map {/ / relevantIndexList storage partition, meet an array of qualifying field_index article which data in the partitioncase (partitionIndex, columnsFreq) => val relevantIndexList = new mutable.MutableList[(Int, Long)]()
        columnsFreq.zipWithIndex.foreach{ case(colCount, colIndex) => // Val ranksHere: val runningTotalCol = runningTotal(colIndex) List[Long] = targetranks. filter(rank => runningTotalCol < rank && runningTotalCol + colCount >= rank) The current field_index(colIndex), RelevantIndexList ++= rankshele.map (rank => (colIndex, Rank - runningTotalCol) runningTotal(colIndex) += colCount} (partitionIndex, relevantIndexList.tolist)}}Copy the code

Here’s an example:

If the target ranking is: targetRanks: [5]

Each partition feild_index data: partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1)), (2, [5, 2])]

Number of columns: numOfColumns: 2

Output results: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]

(5)

@param sortedValueColumnPairs @param ranksLocations (4) Each field_index is the number of entries in the partitionreturn*/ def findTargetRanksIteratively( sortedValueColumnPairs : RDD[(Double, Int)], ranksLocations : Array[(Int, List[(Int, Long)])]): RDD[(Int, Double)] = { sortedValueColumnPairs.mapPartitionsWithIndex( (partitionIndex : Int, valueColumnPairs : Iterator[(Double, Int)]) => {// On the current partition, meet the feild_index and its position on the partition val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2if(targetsInThisPart nonEmpty) {/ / the key in the map for field_index, Value: val columnsRelativeIndex Specifies the position of the feild_index in the current partition where the data meets the ranking array requirement. Map[Int, List[Long]] = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2)) val columnsInThisPart = Targetsinthispart.map (_._1). Distinct // Store each field_index, how many entries were traversed in the partition val runningTotals: mutable.HashMap[Int, Long]= new mutable.HashMap() runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)). ToMap // Traverses the data source of the current partition in the format of (field_value, field_index), and filters out the data that meets the requirements of ranked data valueColumnPairs. Filter {case(value, colIndex) => lazy val thisPairIsTheRankStatistic: Boolean = {// Each time a piece of data is iterated, Total = runningTotals(colIndex) +1 val total = runningTotals(colIndex) + 1L running.update (colIndex, total) columnsRelativeIndex(colIndex).contains(total) } (runningTotals contains colIndex) && thisPairIsTheRankStatistic }.map(_.swap) }else {
          Iterator.empty
        }
      })
  }Copy the code


Analysis:

(1) The code readability of this method is poor

(2) The original data needs to be traversed twice

(3) Avoid oom in executor more effectively than plan 3

(4) In the case of discrete field_value distribution, this scheme is more efficient than the first three

(5) Among the above algorithms, there are two potential problems. When field_value is tilted (that is, there are too many values in a certain range), the efficiency of the algorithm depends heavily on the steps in the algorithm description (2) whether all field_value can be evenly distributed to each partition; Another question is whether it is possible to combine the count of field_values when there are many duplicate field_values, rather than iterating through the duplicate data one by one in a partition.

Note: The above content (problem background, solution algorithm) is taken from High Performance Spark Best Practices for Scaling and Optimizing Apache Spark. Holden Karau and Rachel Warren)


Free experience cloud security (EASY Shield) content security, verification code and other services

For more information about netease’s technology, products and operating experience, please click here.


Relevant article: “recommended” Question | site being hit by a hacker scanning library how to deal with prevention? [Recommended] security policy for netease ESHIELD verification code [Recommended] Client SDK test roadmap