This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).

  • Qin Kaixin technology community – big data business combat series complete catalogue
  • Spark Business Application: In-depth analysis of Spark data skew case tests and tuning guidelines

preface

This paper focuses on the most technical data skew processing algorithm, the following method is for reference only.

  • Solution 1: Use Hive ETL preprocessing
  • Solution 2: Filter the keys that cause skew
  • Scheme 3: Improve the parallelism of Shuffle operations
  • Scheme 4: Two-stage polymerization (local polymerization + global polymerization)
  • Solution 5: Change reduce Join to Map Join
  • Scheme 6: Sample the slanted key and split the join operation
  • Solution 7: Join with random prefix and RDD expansion
  • Option 9: Customize the Partitioner
  • Scheme 8: a variety of scheme combinations

1 the data set

20111230000005 57375476989 eea12893c0c3811607bcf "ecomax" qing 1 1 http://www.qiyi.com/ 20111230000005 66 c5bb7774e31d0a22278249b26bc83a mortal cultivate immortality 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1 20111230000007 1 1 http://www.bblianmeng.com/ 20111230000008 6961 d0c97fe93701fc9c0d861d096cd9 b97920521c78de70ac38e3713f524b50 laptop alliance Library of South China Normal University 11 http://lib.scnu.edu.cn/Copy the code

2. Scala pre-knowledge

Details please refer to this blog, very good: https://blog.csdn.net/zyp13781913772/article/details/81428862Copy the code

3. Data set preprocessing

val sourceRdd = sc.textFile("hdfs://bd-master:9000/opendir/source.txt") sourceRdd.zipWithIndex.take(1) Array( (20111230000005, 57375476989 eea12893c0c3811607bcf "ecomax" qing 1 1 http://www.qiyi.com/, 0). (20111230000005, 66 c5bb7774e31d0a22278249b26bc83a mortal cultivate immortality 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1, 1)) + + + + = = > array add + + = = = > : In front of an array of additional elements val sourceWithIndexRdd = sourceRdd. ZipWithIndex. The map (tuple = > {val array = scala.collection.mutable.ArrayBuffer[String](); array++=(tuple._1.split("\t")); tuple._2.toString +=: array; Array. The toArray}) array (array (0, 20111230000005, 57375476989 eea12893c0c3811607bcf, "ecomax", 1, 1, http://www.qiyi.com/), Array (1, 20111230000005, 66 c5bb7774e31d0a22278249b26bc83a, mortal cultivate immortality, 3, 1, http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1) ) sourceWithIndexRdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/source_index")Copy the code

4 MapSide Join Performance test

  • This solution is applicable when join operations are used on RDD or join statements are used in Spark SQL and the data volume of an RDD or table in join operations is small (for example, hundreds of MB).
  • Implementation principle: A common join is performed in the shuffle process. After shuffle, data with the same key is pulled to a Shuffle Read task for join. In this case, reduce Join is performed. However, if an RDD is small, you can broadcast the full data of small RDD +map operator to achieve the same effect as join, that is, map Join. In this case, shuffle operation will not occur and data skews will not occur.
  • Advantages: Data skew caused by join operation is effective because shuffle does not occur and data skew does not occur.
  • Disadvantages: There are few scenarios, because this solution only applies to the case of a large table and a small table.

4.1 Data Preparation

Source_index:

Array [String] = Array (0 20111230000005 57375476989 eea12893c0c3811607bcf odd "ecomax" qing 1 1 http://www.qiyi.com/, 1 20111230000005 66 c5bb7774e31d0a22278249b26bc83a mortal cultivate immortality 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1 )Copy the code

Data simulation:

val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*") val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t"); (parm(0).trim().toInt,parm(1).trim()) }) (Int, String) = (479936,20111230000005) //1 million data sets val kvRdd2 = kvrdd.map (x=>{if(x._1 < 900001) (900001,x._2) else x}) Kvrdd2. map(x=>x._1 +","+x._2). SaveAsTextFile (" HDFS :// bD-master :9000/big_data/") // 10 thousand data sets val joinRdd2 = kvRdd.filter(_._1 > 900000) joinRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/small_data/")Copy the code

4.2 Data Skew Occurs in Direct Join

Map reduce:  val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*") val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*") val joinRdd = sourceRdd.map(x =>{ val parm=x.split(","); (parm(0).trim().toInt, parm(1).trim) }) val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(","); (parm(0).trim().toInt, parm(1).trim) })Copy the code

4.3 Solve data Skew based on MapSide Join

mapSide: val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*") val sourceRdd2 = Sc. textFile(" HDFS :// bD-master :9000/small_data/p*") //1 million data sets val joinRdd = sourcerdd. map(x =>{val parm=x.split(","); (parm(0).trim().toint, parm(1).trim)}) parm(0).trim().toint, parm(1).trim)}) (parm(0).trim().toInt, parm(1).trim) }) val broadcastVar = sc.broadcast(joinRdd2.collectAsMap) joinRdd.map(x => (x._1,(x._2,broadcastVar.value.getOrElse(x._1,"")))).countCopy the code

5. Parallelism enhancement Test:

  • Implementation principle: Increasing the number of Shuffle Read tasks allows multiple keys originally assigned to one task to be assigned to multiple tasks so that each task processes less data than the original task.
  • Advantages: The scheme is simple to implement and can effectively alleviate the impact of data skew.
  • Disadvantages: the scheme only alleviates data skew, but does not completely eliminate the problem. According to practical experience, its effect is limited.
  • Practical rule of thumb: This solution usually does not solve the data skew completely, because if there are some extreme cases, such as 1 million data for a key, you can’t handle it no matter how many tasks you have.

5.1 Data Preparation

Data simulation — the ids below 900,000 are uniformly changed to multiples of 8, so the data is tilted on the taskid=8 task with a parallelism of 12:

val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index") case class brower(id:Int, time:Long, uid:String, keyword:String, url_rank:Int, click_num:Int, click_url:String) extends Serializable val ds = sourceRdd.map(_.split("\t")).map(attr => brower(attr(0).toInt, attr(1).toLong, attr(2), attr(3), attr(4).toInt, attr(5).toInt, attr(6))).toDS ds.createOrReplaceTempView("sourceTable") val newSource = spark.sql("SELECT CASE WHEN id < 900000 THEN (8  + (CAST (RAND() * 50000 AS bigint)) * 12 ) ELSE id END, time, uid, keyword, url_rank, click_num, click_url FROM sourceTable") newSource.rdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/test_data")Copy the code

5.2 Data Skew:

val sourceRdd = sc.textFile("hdfs://bd-master:9000/test_data/p*") val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t"); (parm(0).trim().toInt,parm(1).trim()) }) kvRdd.groupByKey(12).countCopy the code

5.3 Solve data skew based on parallelism enhancement

kvRdd.groupByKey(17).count
Copy the code

6 Spark Random prefix promotion test

6.1 Data Preparation

val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*",13) val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t"); (parm(0).trim().toint,parm(4).trim().toint)}) val kvRdd2 = kvRdd.map(x=>{if(x._1 > 20000) (20001,x._2) else x})Copy the code

6.2 Data Skew

kvRdd2.groupByKey().collect
Copy the code

6.3 Resolving Data Skew

val kvRdd3 = kvRdd2.map(x=>{if (x._1 ==20001) (x._1 + scala.util.Random.nextInt(100),x._2) else x})
kvRdd3.sortByKey(false).collect
Copy the code

Two-stage aggregation (local aggregation + global aggregation) test

7.1 Theoretical knowledge of two-stage polymerization (local polymerization + global polymerization)

  • Application scenario: This scheme is applicable when you run the reduceByKey aggregation shuffle operator on the RDD or use the Group by statement in Spark SQL to perform group aggregation.
  • The implementation principle of the solution is as follows: The same key can be changed into multiple different keys by attaching random prefixes. In this way, the data processed by one task can be dispersed to multiple tasks for local aggregation, thus solving the problem of excessive data processed by a single task. Then remove the random prefix and do the global aggregation again to get the final result. The specific principle is shown in the figure below.
  • Advantages: This scheme is effective for data skew caused by shuffle operation of aggregation. Data skew can usually be resolved, or at least substantially alleviated, improving Spark job performance by several times.
  • Disadvantages: This scheme applies only to shuffle operations of aggregation types. If it is a Shuffle operation of the Join class, another solution is needed

7.2 Two-stage aggregation (local aggregation + global aggregation) code implementation:

package skewTuring; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Random; /** * Scenario: This scheme is applicable when the RDD runs the reduceByKey aggregation shuffle operator or uses the Group by statement in Spark SQL to perform group aggregation. * Two-stage aggregation (local aggregation + global aggregation) */ public class SkewTuring11 {public static void main(String[] args) throws Exception{// Build the Spark context  SparkConf conf = new SparkConf().setAppName("SkewTuring11"); conf.setMaster("local[8]"); JavaSparkContext sc = new JavaSparkContext(conf); / / 0 20111230000005 57375476989 eea12893c0c3811607bcf odd "ecomax" qing 1 1 http://www.qiyi.com/ JavaRDD < String > sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index"); // The first step is to give each key in the RDD a random prefix. JavaPairRDD<String, Long> randomPrefixRdd = sourceRdd.mapToPair(new PairFunction<String,String,Long>() { @Override public Tuple2<String, Long> call(String s) throws Exception { String[] splits = s.split("\t"); Random random = new Random(); int prefix = random.nextInt(10); Long key = Long.valueOf(splits[0]); if(key > 10000) { return new Tuple2<String, Long>(prefix + "_" + 10001L, 1L); } else { return new Tuple2<String, Long>(prefix + "_" + key,1L); }}}); // Step 2, perform local aggregation on keys prefixed randomly. JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(new Function2<Long, Long, Long>() { @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; }}); // Remove the random prefix for each key in the RDD. JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair( new PairFunction<Tuple2<String,Long>, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Long> call(Tuple2<String, Long> tuple) throws Exception { long originalKey = Long.valueOf(tuple._1.split("_")[1]); return new Tuple2<Long, Long>(originalKey, tuple._2); }}); // Step 4, perform global aggregation on RDD with random prefix removed. JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey( new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; }}); System.out.println("*********************************************"); System.out.println(globalAggrRdd.first()); }Copy the code

}

8 Sample slanted keys and split the join operation test

8.1 Theoretical basis of sampling inclined key and splitting Join Operation

  • The sampling
  • In the data on the Join side, add random prefixes and suffixes for the keys with a large amount of data, so that the data with the same Key is changed into the data with different keys. In this way, the skewed data sets are scattered into different tasks, completely solving the data skew problem.
  • In the data on the other side of the Join, part of the data corresponding to the slanted Key (expanded by N times) is cartesian product of the random prefix set, so as to ensure that the slanted Key can be joined normally no matter how the slanted Key is prefixed.

8.2 Sampling the slanting key and separating the implementation of the JOIN operation code

    package skewTuring;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import java.util.Random;
    
    /**
     * 方案适用场景:如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀
     * 采样倾斜key并分拆join操作
     */
    public class SkewTuring22 {
        public static void main(String[] args) throws Exception{
            // 构建Spark上下文
            SparkConf conf = new SparkConf().setAppName("SkewTuring11");
            conf.setMaster("local[8]");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //主源数据--少量倾斜key
            //0	20111230000005	57375476989eea12893c0c3811607bcf	奇艺高清	1	1	http://www.qiyi.com/
            JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
            JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
                @Override
                public Tuple2<Long,String> call(String s) throws Exception {
                    String[] splits = s.split("\t");
                    Long key = Long.valueOf(splits[0]);
                    String value = splits[6];
                    if(key > 10000) {
                        return new Tuple2<Long,String>(10001L, value);
                    } else {
                        return new Tuple2<Long,String>(key, value);
                    }
                }
            });
    
            //副源数据 -均匀key
            JavaPairRDD<Long,String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
                @Override
                public Tuple2<Long,String> call(String s) throws Exception {
                    String[] splits = s.split("\t");
                    Long key = Long.valueOf(splits[0]);
                    String value = splits[6];
                    return new Tuple2<Long,String>(key, value);
                }
            });
    
            //首先从包含了少数几个导致数据倾斜key的randomPrefixRdd中,采样10%的样本数据。
            JavaPairRDD<Long,String> sampledRDD = mapdSourceRdd.sample(false, 0.1);
    
            System.out.println(" 随机采样:"+sampledRDD.first());
    
            // 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
            // 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
            // 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
            JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
                    new PairFunction<Tuple2<Long,String>, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                                throws Exception {
                            return new Tuple2<Long, Long>(tuple._1, 1L);
                        }
                    });
    
            JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
                    new Function2<Long, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Long call(Long v1, Long v2) throws Exception {
                            return v1 + v2;
                        }
                    });
    
            JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
                    new PairFunction<Tuple2<Long,Long>, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                                throws Exception {
                            return new Tuple2<Long, Long>(tuple._2, tuple._1);
                        }
                    });
    
            final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
    
            System.out.println("数据倾斜id"+skewedUserid);
    
    
            /**
             * 主源数据  过滤倾斜key 形成独立的RDD
             */
            JavaPairRDD<Long, String> skewedRDD = mapdSourceRdd.filter(
                    new Function<Tuple2<Long,String>, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                            return tuple._1.equals(skewedUserid);
                        }
                    });
            System.out.println("主源数据 倾斜数据  rdd:"+ skewedRDD.take(100));
    
            // 从mapdSourceRdd中分拆出不导致数据倾斜的普通key,形成独立的RDD。
            JavaPairRDD<Long, String> commonRDD = mapdSourceRdd.filter(
                    new Function<Tuple2<Long,String>, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                            return !tuple._1.equals(skewedUserid);
                        }
                    });
            System.out.println("主源数据 常规数据  rdd:"+ commonRDD.take(100));
    
            /**
             * sourceRdd2  副源数据  过滤倾斜数据  随机扩容N倍
             */
            // rdd2,就是那个所有key的分布相对较为均匀的rdd。
            // 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
            // 对扩容的每条数据,都打上0~100的前缀。
            JavaPairRDD<String, String> skewedRandomRDD2 = mapdSourceRdd2.filter(
                    new Function<Tuple2<Long,String>, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                            return tuple._1.equals(skewedUserid);
                        }
                    }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
                        @Override
                        public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
                            List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
                            for (int i = 0; i < 10; i++) {
                                list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
                            }
                            return list.iterator();
                        }
            });
    
            System.out.println("副源数据 扩容表处理:" + skewedRandomRDD2.take(100));
    
            /**
             * 主源倾斜数据  key+随机数
             */
             // 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
            // 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
            final JavaPairRDD<String, String> skewedRandomRDD = skewedRDD.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                        throws Exception {
                    Random random = new Random();
                    int prefix = random.nextInt(10);
                    return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
                }
            });
    
            System.out.println("主源数据 随机数处理:" + skewedRandomRDD.take(100));
    
            JavaPairRDD<Long, Tuple2<String, String>> joinedRDD1 = skewedRandomRDD
                    .join(skewedRandomRDD2)
                    .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,String>>, Long, Tuple2<String, String>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, String>> call(Tuple2<String, Tuple2<String, String>> tuple) throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, String>>(key, tuple._2);
                        }
                    });
            System.out.println("主 副源数据 倾斜数据 join 处理:" + joinedRDD1.take(100));
    
            // 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
            JavaPairRDD<Long, Tuple2<String, String>> joinedRDD2 = commonRDD.join(mapdSourceRdd2);
    
            System.out.println("主 副源数据 常规数据 join 处理:" + joinedRDD2.take(100));
            // 将倾斜key join后的结果与普通key join后的结果,uinon起来。
            // 就是最终的join结果。
            JavaPairRDD<Long, Tuple2<String, String>> resultRDD = joinedRDD1.union(joinedRDD2);
    
            System.out.println("最终join结果:"+ resultRDD.sample(false, 0.1).take(100));
        }
    }
Copy the code

9 Use random prefixes and expanded RDD to join (randomly add N random prefixes to large tables and multiply small tables by N)

  • On the Join side: If there are a large number of data skew keys, it is not meaningful to separate these large numbers of skew keys in the previous method. In this case, it is more suitable to add random prefix to all data sets with skew directly

  • On the other side of Join: Cartesian product (i.e., n-fold expansion of data volume) is performed on the whole of another data set without serious data skew and the random prefix set.

    package skewTuring; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; /** * Application scenarios: If the join operation is performed, Large numbers of keys in the RDD cause data skew * Use random prefixes and expanded RDD to perform join */ public class SkewTuring33 {public static void main(String[] args) throws SparkConf conf = new SparkConf().setAppName("SkewTuring11"); conf.setMaster("local[8]"); JavaSparkContext sc = new JavaSparkContext(conf); / / the source data 1 -- there are many tilt key JavaRDD < String > sourceRdd = sc. TextFile (" HDFS: / / master: 9000 / skewtestdata/source_index "); / / the source data, a large number of tilt key / / 0 20111230000005 57375476989 eea12893c0c3811607bcf odd "ecomax" qing 1 1 http://www.qiyi.com/ JavaRDD < String > sourceRdd1 = sc.textFile("hdfs://master:9000/skewtestdata/source_index"); JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() { @Override public Tuple2<Long,String>  call(String s) throws Exception { String[] splits = s.split("\t"); Long key = Long.valueOf(splits[0]); String value = splits[6]; if(key > 10000) { return new Tuple2<Long,String>(10001L, value); } else { return new Tuple2<Long,String>(key, value); }}}); JavaPairRDD<Long, String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() { @Override public Tuple2<Long,String> call(String s) throws Exception { String[] splits = s.split("\t"); Long key = Long.valueOf(splits[0]); String value = splits[6]; return new Tuple2<Long,String>(key, value); }}); /** * primary skew data key+ random number */ / separate RDD of the key that caused the skew from RDD1, each data is prefixed with a random prefix of 100 or less. // Join the RDD from rDD1 with the RDD from RDD2. final JavaPairRDD<String, String> skewedRandomRDD = mapdSourceRdd.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception { Random random = new Random(); int prefix = random.nextInt(100); return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2); }}); System.out.println(" skewedrandomrdd.take (100)); /** * sourceRdd2 */ / rDD2 is an RDD where all keys are distributed evenly. // Filter out the data corresponding to the previously obtained key in RDD2, divide it into separate RDD, and expand all the data in RDD by 100 times using flatMap operator. // Prefix each data item with a number from 0 to 100. JavaPairRDD<String, String> expandedRDD = mapdSourceRdd2.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() { @Override public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception { List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>(); for (int i = 0; i < 100; i++) { list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2)); } return list.iterator(); }}); Println (" Expandedrdd.take (100)); system.out.println (" Expandedrdd.take (100)); // Join the two processed RDDS. JavaPairRDD<String, Tuple2<String, String>> joinedRDD = skewedRandomRDD.join(expandedRDD); System.out.println(" final join result: "+ joinedrdd.take (100)); }}Copy the code

10 Customize the Partitioner

  • Application scenario: A large number of different keys are assigned to the same Task, resulting in a large amount of Task data.
  • Solution: Use a custom Partitioner implementation class instead of the default HashPartitioner, trying to evenly distribute all the different keys among different tasks.
  • Advantage: does not affect the original parallelism design. If the degree of parallelism is changed, the degree of parallelism of subsequent stages will also be changed by default, which may affect subsequent stages.
  • Disadvantages: The application scenario is limited and different keys can only be scattered. It is not applicable to scenarios where the data set corresponding to the same Key is very large. The effect is similar to that of parallelism adjustment, which can only alleviate data skew but not completely eliminate it. In addition, the Partitioner needs to be customized according to the data characteristics, which is not flexible enough.

11 summary

This article mainly from the Angle of data tilt analysis, through the actual case test summary and sublimation, a written blog is not easy, I hope they cherish!!

Qin Kaixin in Shenzhen