This article has been published by netease Cloud community authorized by the author Ye Linbao.
Welcome to visit netease Cloud Community to learn more about Netease’s technical product operation experience.
Problem description
The first row is the column name. All the other columns are of type double except the first column, which is string. Starting with the second line, each line represents one record
(2) Requirement: Input an array such as [2, 4](hereinafter, refer to the sorted array), and output each column (except the first column) to extract the value of the second and fourth bits in ascending order. The results are shown below:
Solution algorithm
I. Iterative calculation
Description:
In simple terms, you iterate through each column, then each row, and then find the data that meets the corresponding ranking
Code:
/** * Filter out the values of each column that meet the requirements of the given rank array ** @param dataFrame data source * @param ranks array * @return*/ def findRankStatistics(dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {require(ranks. Forall (_ > 0)) // obtain the numberOfColumns val numberOfColumns = datame.schema. length var I = 0 // Var result = Map[Int, Iterable[Double]]()whileVal col = datafame.rdd. map(row => row.getDouble(I)) val sortedCol: datafame.rdd. map(row => row.getDouble(I)) RDD[(Double, Long)] = col.sortby (v => v).zipWithIndex() val ranksOnly = sortedCol.filter {case (colValue, index) => ranks.contains(index + 1)}.keys
val list = ranksOnly.collect()
result += (i -> list)
i += 1
}
result
}Copy the code
Analysis:
Disadvantages: low efficiency, the calculation of each column rank is serial.
Second, the groupByKey
Description:
The calculation between columns does not affect each other in nature. Therefore, the improvement method of scheme 2 is to use the column index as the key, and the value of each row as the value, shuffle to different partitions according to the hash value of the key. Compute each partition in parallel
Code:
/** * Filter each column to meet the required positions in the given rank array, groupByKey scheme ** @param dataFrame data source * @param ranks array * @return*/ def findRankStatistics(dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {require(ranks. Forall (_ > 0)) // Convert the source data into pairRDD val based on field_index and field_value PairRDD: RDD[(Int, Double)] = mapToKeyValuePairs(dataFrame) RDD[(Int, Iterable[Double])] = pairrdd. groupByKey() // For each field_index, calculate the corresponding field_values, MapValues (iter => {// sort field_values val sortedIter = iter.toarray. sorted sortedIter.toIterable.zipWithIndex.flatMap({case (colValue, index) => if (ranks.contains(index + 1)) {
Iterator(colValue)
} else {
Iterator.empty
}
})
}).collectAsMap()
}Copy the code
Analysis:
This solution only uses data of small size (i.e. small number of rows). For large data, groupByKey operation is easy to oom.
The groupByKey execution effect is as follows:
GroupByKey temporarily stores all <key, values> in the memory. Therefore, if there are many values for the same key, the executor will oom
Third and second order
Description:
In addition to the oom problem on the Executor side, there is another problem. The sorting operation is performed on the Executor side after shuffle. Spark sort based shuffle Supports sorting keys in the Shuffle phase. Therefore, efficiency can be improved through quadratic sorting.
Algorithm idea:
(1) Expand each row of df data into (field_index, field_value) format and convert it to ((field_index, field_value), 1)
(2) Custom partition according to field_index, field_value, 1
(3) call repartitionAndSortWithinPartitions function, on the basis of field_index partitions, on the basis of (field_index field_value) ordering
(4) Filter out the values of each column that meet the coordination requirements
(5) Convert to the desired output format
Code:
/** * Filter out the values of each column that meet the requirements in the given ranked array. The quadratic sorting scheme * @param dataFrame data source * @param targetRanks ranked array * @number of Param partitions * @return*/ def findRankStatistics(dataFrame: DataFrame, targetRanks: List[Long], partitions: Int) = {// Expand each row of df into (field_index, field_value) format, (field_index, field_value), 1) pariRdd // where "1" is only a placeholder for value in pairRdd and does not affect the final result: RDD[((Int, Double), Int)] = mapToKeyValuePairs(dataFrame).map((_, 1)) According to ((field_index field_value), Val partitioner = new ColumnIndexPartition(field_index, field_value) 1) in the (field_index, Field_value) sorting val sorted = pairRDD. RepartitionAndSortWithinPartitions (partitioner) / / filter out the required data val filterForTargetIndex: RDD[(Int, Double)] = sorted. MapPartitions (iter => {var currentColumnIndex = -1 var runningTotal = 0 iter.filter({case(((colIndex, value), _)) => // Data in the same partition may contain multiple field_index columnsif(colIndex ! = currentColumnIndex) { currentColumnIndex = colIndex runningTotal = 1 }else{runningTotal += 1} // Save the values that meet the requirements of the ranking targetranks. contains(runningTotal)})}.map(_._1), preservesPartitioning =true) / / into the required output format groupSorted (filterForTargetIndex. Collect ())} / / implicit conversion, for binary array, according to the first order first, then the second order implicit val ordering: Tuple2 /** * Set the value of each line to a field_index, field_value format. Convert to ((field_index, field_value), 1) * @param dataFrame * @return*/ def mapToKeyValuePairs(dataFrame: DataFrame): RDD[(Int, Double)] = {val rowLength = datafame.schema. length datafame.rdd. flatMap(row => Range(0, 0)) RowLength). The map (I = > (I, row. The getDouble (I))))} / * * * custom partitions, According to ((field_index field_value), */ class ColumnIndexPartition(override val numPartitions: Int) extends Partitioner { require(numPartitions >= 0, s"Number of partitions " + s"($numPartitions) cannot be negative.") override def getPartition(key: Any): Int = { val k = key.asInstanceOf[(Int, Double)] math.abs (k._1) % numPartitions //hashcode of column index}} /** * convert to desired output format * aggregate it into map, Key is field_index, value is an array of field_values in the same field_index * @param it The elements in the array are (field_index, field_value) * @return
*/
private def groupSorted(it: Array[(Int, Double)]): Map[Int, Iterable[Double]] = {
val res = List[(Int, ArrayBuffer[Double])]()
it.foldLeft(res)((list, next) => list match { case Nil =>
val (firstKey, value) = next List((firstKey, ArrayBuffer(value))) case head :: rest =>
val (curKey, valueBuf) = head val (firstKey, value) = next if(! firstKey.equals(curKey)) { (firstKey, ArrayBuffer(value)) :: list }else {
valueBuf.append(value)
list
}
}).map { case (key, buf) => (key, buf.toIterable) }.toMap
}Copy the code
Analysis:
In contrast to scheme 2, this scheme pushes the data ordering down to the shuffle stage, and then filters out the required data by iterating through each number of partitions, preventing the executor from loading all the data into memory. However, during the shuffle phase, a large amount of data (a large number of rows and different keys are called to the same executor), especially a large number of duplicate data, may result in oom during the second sorting process.
Free experience cloud security (EASY Shield) content security, verification code and other services
For more information about netease’s technology, products and operating experience, please click here.
Using FlatBuffers in Android