To get the topN of each group, first group and then sort each group to get the topN.

The test data

hadoop 23
spark 45
java 90
spark 57
spark 90
hadoop 99
hadoop 76
spark 45
spark 88
spark 89
hadoop 45
hadoop 90
java 78
java 70
Copy the code

1.1. The first step is to convert the source data into (key, value) format to facilitate grouping by key

SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount"); JavaSparkContext JSC = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("C:\ Users\ 12285\ Desktop\ test"); //hadoopRDD JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String , Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String line) throws Exception { return new Tuple2<String, Integer>(line.split(" ")[0], Integer.valueOf(line.split(" ")[1])); }});Copy the code

1.2. Step 2: Group by key

JavaPairRDD<String, Iterable<Integer>> groupPairs = pairs. GroupByKey ();Copy the code

1.3. The third step is sorting within the group

1.3.1 Since the RDD after grouping is JavaPairRDD<String, Iterable<Integer>>, key is the subject name, and value is the set of scores, mapToPair will be used to sort within the group

JavaPairRDD<String, Iterable<Integer>> top5Pairs = groupPairs.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() { private static final long serialVersionUID = 1L; public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedPair) throws Exception { Integer[] top5 = new Integer[5]; String groupedKey = groupedPair._1; Iterator<Integer> groupedValue = groupedPair._2.iterator(); while(groupedValue.hasNext()) { Integer value = groupedValue.next(); for (int i = 0; i < top5.length; i++) { if (top5[i] == null) { top5[i] = value; break; }else if(top5[I] > value){for (int j = 4; j > i; j--) { top5[j] = top5[j-1]; } top5[i] = value; break; } return new Tuple2<String, Iterable<Integer>>(groupedKey, arrays.asList (top5)); }});Copy the code

1.4. Complete code

package com.chb.sparkDemo.TopNGroup; import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class TopNGroupTest { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount"); JavaSparkContext JSC = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("C:\ Users\ 12285\ Desktop\ test"); //hadoopRDD JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String , Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String line) throws Exception { return new Tuple2<String, Integer>(line.split(" ")[0], Integer.valueOf(line.split(" ")[1])); }}); JavaPairRDD<String, Iterable<Integer>> groupPairs = pairs. GroupByKey (); JavaPairRDD<String, Iterable<Integer>> top5Pairs = groupPairs.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() { private static final long serialVersionUID = 1L; public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedPair) throws Exception { Integer[] top5 = new Integer[5]; String groupedKey = groupedPair._1; Iterator<Integer> groupedValue = groupedPair._2.iterator(); while(groupedValue.hasNext()) { Integer value = groupedValue.next(); for (int i = 0; i < top5.length; i++) { if (top5[i] == null) { top5[i] = value; break; }else if(top5[I] > value){for (int j = 4; j > i; j--) { top5[j] = top5[j-1]; } top5[i] = value; break; } return new Tuple2<String, Iterable<Integer>>(groupedKey, arrays.asList (top5)); }}); top5Pairs.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<String, Iterable<Integer>> pair) throws Exception { String groupedKey = pair._1; System.out.println("Grouped Key : " + groupedKey); Iterator<Integer> groupedValue = pair._2.iterator(); while(groupedValue.hasNext()) { Integer value = groupedValue.next(); System.out.println(value); } System.out.println("=================================="); }}); }}Copy the code