1. Summary of RDD

1.1 What is RDD?

Resilient Distributed Dataset (RDD) Resilient Distributed Dataset (RDD) is the most basic abstraction in Spark. It represents an immutable and partitioned collection of elements that can be computed in parallel.

RDD has data flow model characteristics: automatic fault tolerance, location-aware scheduling, and scalability.

RDD allows users to explicitly cache the working set in memory when executing multiple queries and reuse the working set for subsequent queries, which greatly improves query efficiency.

We can think of an RDD as a proxy that we operate as a local collection without concern for task scheduling, fault tolerance, and so on.

1.2 Attributes of RDD

RDD is described this way in the RDD source code

*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

Copy the code
  1. A Partition is the basic constituent unit of a data set. For RDD, each shard is processed by a computation task that determines the granularity of parallel computation. The user can specify the number of shards in the RDD when creating the RDD; if not, the default value is used. The default is the number of CPU Cores allocated by the program;
  2. For RDD, each shard is processed by a computation task that determines the granularity of parallel computation. The user can specify the number of shards in the RDD when creating the RDD; if not, the default value is used. The default is the number of CPU Cores allocated by the program;
  3. RDD depends on each other. Each transformation of an RDD generates a new RDD, so a pipeline-like dependency relationship precedes the RDD. When data in some partitions is lost, Spark recalculates the lost partition data using the dependency relationship instead of recalculating all partitions of the RDD.
  4. A Partitioner, the sharding function of the RDD. Two types of sharding functions are currently implemented in Spark, the HashPartitioner based on hash and the RangePartitioner based on range. Only the RDD with key-value will have a Partitioner. The value of the Partitioner for a non-key-value RDD is None. The Partitioner function determines not only the number of fragments in the RDD itself, but also the number of fragments in the output of the Parent RDD Shuffle.
  5. A list that stores the preferred location for accessing each Partition. For an HDFS file, this list stores the block location of each Partition. The concept of “Mobile data is better than mobile computing” is adopted. When Scheduling tasks, Spark tries to allocate computing tasks to storage locations of data blocks to be processed.

2 create RDD

2.1 Is created by an existing Scala collection

# Create an RDD by parallelizing scala collections, usually used during testingScala > var RDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24Copy the code

2.2 Create data sets from external storage systems, including local file systems and all data sets supported by Hadoop, such as HDFS, Cassandra, and Hbase

var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile(HDFS: 192.168.80.131:9000 / words. "text")
Copy the code

2.3 Invoking Transformation to an existing RDD generates a new RDD.

3 Programming API of RDD

3.1 the Transformation

All transformations in this RDD are lazy-loaded, that is, they do not compute the results directly. Instead, they simply remember the transformation actions applied to the underlying data set, such as a file. These operations will only actually run if a Driver action that returns the result occurs. This design makes Spark run more efficiently.

Common Transformation operations:

conversion meaning
map(func) Returns a new RDD consisting of each input element transformed by the func function
filter(func) Returns a new RDD consisting of input elements evaluated by the func function that return true
flatMap(func) Similar to a map, but each input element can be mapped to zero or multiple output elements (so func should return a sequence, not a single element)
mapPartitions(func) Iterator[T] => Iterator[U]; Iterator[U] => Iterator[U]
mapPartitionsWithIndex(func) Similar to mapPartitions, but func takes an integer argument to represent the index value of partitions, so when running on RDD of type T, func must be of type (Int, Interator[T]) => Iterator[U].
sample(withReplacement, fraction, seed) The data is sampled according to the ratio specified by fraction, and you can choose whether to replace it with a random number. Seed is used to specify the random number generator seed
union(otherDataset) The union of the source RDD and the parameter RDD returns a new RDD
intersection(otherDataset) The intersection of the source RDD and the parameter RDD returns a new RDD
distinct([numTasks])) A new RDD is returned after the source RDD is de-duplicated
groupByKey([numTasks]) Called on an RDD of (K,V), returns an RDD of (K, Iterator[V])
reduceByKey(func, [numTasks]) Returns an RDD of (K,V). Aggregates the values of the same key using the specified Reduce function. Similar to groupByKey, the number of Reduce jobs can be set using the second optional parameter
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) AggregateByKey (0)(aggregateByKey(0))+.+) operates on the RDD of k/y
sortByKey([ascending], [numTasks]) Called on an RDD of (K,V), K must implement the Ordered interface to return a RDD of (K,V) Ordered by key
sortBy(func,[ascending], [numTasks]) Similar to sortByKey, but more flexible
join(otherDataset, [numTasks]) Called on RDD of type (K,V) and (K,W), returns a RDD of (K,(V,W) with all elements of the same key together
cogroup(otherDataset, [numTasks]) Called on RDD of type (K,V) and (K,W), returns an RDD of type (K,(Iterable,Iterable)
cartesian(otherDataset) The cartesian product
pipe(command, [envVars]) Calling an external program
coalesce(numPartitions) The first parameter is how many partitions to partition, and the second parameter is shuffle. The default value is false. Change from less partitions to more partitions true; Multiple partitions change to fewer partitions False
repartition(numPartitions) Repartition must shuffle the shuffle parameter is how many zones to divide into
repartitionAndSortWithinPartitions(partitioner) Repartition + sort is more efficient than partition before sort. Perform operations on THE RDD of K/V

3.2 the Action

To trigger the code to run operations, we need at least one Action Action for a Spark application.

action meaning
reduce(func) The func function aggregates all elements in the RDD. This function must be commutative and parallel
collect() In a driver, all elements of a data set are returned as an array
count() Returns the number of elements of RDD
first() Return the first element of the RDD (similar to take(1))
take(n) Returns an array consisting of the first n elements of the dataset
takeSample(withReplacement,num, [seed]) Returns an array of num elements sampled randomly from the dataset, optionally replacing the missing portion with a random number. Seed specifies a random number generator seed
takeOrdered(n, [ordering])
saveAsTextFile(path) Save the data set elements as textFiles to the HDFS file system or other supported file systems. For each element, Spark calls the toString method to replace it with text in the file
saveAsSequenceFile(path) You can save elements in a data set in Hadoop Sequencefile format to a specified directory to enable the HDFS or other file systems supported by Hadoop.
saveAsObjectFile(path)
countByKey() For RDD of type (K,V), return a map of (K,Int), representing the number of elements corresponding to each key.
foreach(func) On each element of the dataset, run the function func to update it.
foreachPartition(func) On each partition, run the function func

3.3 Spark WordCount code example

Execution flow chart:

Pom. XML dependency

<! Scala -lang</groupId> <artifactId> Scala -library</artifactId> The < version > 2.2.0 < / version > < / dependency > <! Spark </groupId> <artifactId>spark-core_2.11</artifactId> The < version > 2.2.0 < / version > < / dependency > <! <dependency> <groupId>org.apache.hadoop</groupId> <artifactId> Hadoop-client </artifactId> The < version > server < / version > < / dependency >Copy the code

Scala version code implementation:

package com.zhouq.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, Object ScalaWordCount {def main(args: Array[String]): Unit = {// This line of code is because I'm running on Windows directly and need to read files on Hadoop and set my username. If Linux is used, this parameter is not required. System.setproperty ("HADOOP_USER_NAME"."root"Val conf = new SparkConf().setAppName()"scalaWordCount")
    val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")

//    conf.set("spark.testing.memory"."102457600"Val sc = new SparkContext(conf) // Specify where to read data from later to create RDD (elastic distributed data set) // fetch a line of data val lines: RDD[String] = lines.flatmap (_.split() RDD[String] = lines.flatmap (_.split(""Val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) RDD[(String, Int)] = WordAndOne.reduceByKey (_ + _)falseSorted = reduced. SortBy (_._2,falseSaveAsTextFile (args(1)) // Free resource sc.stop()}}Copy the code

Java 7 version:

package com.zhouq.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; Public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaWordCount"); // Create SparkContext JavaSparkContext JSC = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception{return Arrays.asList(line.split("")).iterator(); }}); JavaPairRDD<String, Integer> wordAndOne = words. MapToPair (new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String tp) throws Exception {returnnew Tuple2<>(tp, 1); }}); // Swap and sort first, because only the groupByKey method JavaPairRDD<Integer, String> swaped = wordandone.maptopair (new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception { //return new Tuple2<>(tp._2, tp._1);
                returntp.swap(); }}); // Sort JavaPairRDD<Integer, String> sorted = swaped. SortByKey (false); JavaPairRDD<String, Integer> result = sorted. MapToPair (new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {returntp.swap(); }}); // Output to HDFS result.saveAsTextFile(args[1]); jsc.stop(); }}Copy the code

Java8 version:

package com.zhouq.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; Public class JavaLambdaWordCount {public static void main(String[] args) {public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JavaWordCount"); // Create SparkContext JavaSparkContext JSC = new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> lines = jsc.textFile(args[0]); Lines.flatmap (line -> arrays.aslist (line.split("")).iterator());
        JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split("")).iterator()); / / the words (a, 1), (b, 1), (c, 1), (a, 1) / / words. MapToPair (tp - > new Tuple2 < > (tp, 1)); JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1)); // Swap first and then sort, because there is only the groupByKey method, // swaped. MapToPair (tp -> tp.swap()); JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> { //return new Tuple2<>(tp._2, tp._1);
            returntp.swap(); }); // Sort JavaPairRDD<Integer, String> sorted = swaped. SortByKey (false);

        //再次交换顺序
//        sorted.mapToPair(tp -> tp.swap());
        JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());

        //输出到hdfs
        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}
Copy the code

4 Dependency of RDD

There are two different types of relationships between AN RDD and its dependent parent RDD(which may have more than one), namely narrow dependency and wide dependency.

Narrow dependency: A narrow dependency means that the Partition of each parent RDD is used by at most one Partition of the RDD. Can be compared to the only child. Wide dependency: The Partition of multiple RDD characters will depend on the Partition of the same parent RDD

5 Persistence of RDD

5.1 RDD Cache (Persistence)

One of the most important features in Spark is the ability to persist (or cache) datasets in memory across operations. When you persist an RDD, each node stores any partitions it evaluates in memory and reuses them in other operations on that dataset (or datasets derived from it). This makes future actions faster (often more than 10 times). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark the RDD you want to keep with the persist() or cache() methods. The first time it is evaluated in the action, it will remain in the node’s memory. Spark’s cache is fault-tolerant – if any partition of the RDD is lost, it is automatically recalculated using the transformation that created it in the first place.

5.2 When do we need persistence?

  1. Fast computation is required
  2. The resources in the cluster must be large enough
  3. Important: Cache data triggers multiple actions
  4. You are advised to filter data first and then cache the narrowed data to the memory.

5.3 How to Use it

Use rdd.persist() or rdd.cache()

val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss"Cached.count cached.count (); // Use the cache method to cache data into memory val cache = lines.cache(Copy the code

5.4 StorageLevel of the data cache StorageLevel

The storagelevel. scala source code can be seen as:

val NONE = new StorageLevel(false.false.false.false)
val DISK_ONLY = new StorageLevel(true.false.false.false)
val DISK_ONLY_2 = new StorageLevel(true.false.false.false, 2)
val MEMORY_ONLY = new StorageLevel(false.true.false.true)
val MEMORY_ONLY_2 = new StorageLevel(false.true.false.true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false.true.false.false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false.true.false.false, 2)
val MEMORY_AND_DISK = new StorageLevel(true.true.false.true)
val MEMORY_AND_DISK_2 = new StorageLevel(true.true.false.true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true.true.false.false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true.true.false.false, 2)
val OFF_HEAP = new StorageLevel(true.true.true.false1),Copy the code

Explain what each parameter means:

The first parameter indicates whether the disk is stored as a Java object. The second parameter indicates whether the disk is stored as a Java object. The third parameter indicates whether the disk is stored as a Java object. Whether the data in memory is stored as Java objects,true means yes, false means serialized Store several copies of data (in case the executor fails and the partition data is lost, read the backup data on another machine and recalculate when the task is reassigned)

OFF_HEAP: Off-heap memory that stores RDD in serialized format to Tachyon(a distributed memory storage system)

5.5 How to Select a Storage Tier

Spark’s multiple storage levels mean different tradeoffs between memory utilization and CPU utilization. We recommend using the following procedure to select an appropriate storage level:

  1. Select the default storage level (MEMORY_ONLY) if your RDD fits the default one. Because this is the most CPU efficient option, it will make the operation on the RDD as fast as possible.
  2. If the default level is not appropriate, select MEMORY_ONLY_SER. Choosing a faster serialization library increases the space utilization of objects, but is still fairly fast to access.
  3. Do not store RDD on disk unless functions are expensive to compute or they need to filter a large amount of data; otherwise, re-computing a partition will be just as slow as reading data from a redisk.
  4. If you want faster error recovery, you can take advantage of the replicated storage level. All storage levels can support full fault tolerance by double-counting lost data, but duplicate data allows you to continue running tasks on the RDD without double-counting lost data.
  5. In environments with a lot of memory or multiple applications, OFF_HEAP has the following advantages:
    1. It runs multiple performers sharing the same memory pool in Tachyon
    2. It significantly reduces the cost of garbage collection
    3. If a single performer crashes, cached data is not lost

5.6 delete the cache

Spark automatically monitors the cache usage of each node and deletes old data based on the least recent usage rule. If you want to manually delete the RDD, use the rdd.unpersist () method

5.7 Checkpoint mechanism of RDD

In addition to caching data in memory, we can also cache data in HDFS to ensure that intermediate data is not lost.

When do we need to do chechpoint?

  1. To do complex iterative calculation, it is required to ensure data security and not lose
  2. Low speed requirements (as opposed to cache-to-memory)
  3. Save the intermediate results to the HDFS

How do you checkpoint?

First set the checkpoint directory, then perform the calculation logic, and then checkpoint().

The following code uses the cache and checkpoint methods to calculate the topN of the most popular teachers in each course

package com.zhouq.spark import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** ** TopN --2 * -- cache * -- checkpoint */ object GroupFavTeacher2_cache_checkpoint { def main(args: Array[String]): Unit = {val topN = args(1). ToInt val subjects = Array("bigdata"."javaee"."php")
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]"Val sc = new SparkContext(conf) //checkpoint checkpoint dir // sc.setcheckpointdir ("hdfs://hdfs://hadoop1:8020/user/root/ck20190215"Val Lines: RDD[String] = sc.textFile(args(0)) Val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/")
      var teacher = line.substring(index + 1)
      var httpHost = line.substring(0, index)
      var subject = new URL(httpHost).getHost.split("[the]")(0) ((subject, teacher), 1)}) RDD [((String, String), Int)] = subjectTeacherAndOne. ReduceByKey + _) (_ / / the first to use the cache RDD the data cached in memory. Cache val cached is used only when the RDD marked as cache is used repeatedly: RDD[((String, String), Int)] = reduced value. Cache () RDD[((String, String), Int)] = reduced.checkpoint() /** * (sortBy of RDD is used to sort disciplines at a reduced speed Operation, each take will submit a task. For details, check log printing */for(sub < -subjects) {// Filter the current subjects val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub) // Use RDD sortBy, memory + disk sort, avoid scala sort exceptions caused by memory running out Val favTeacher: Array[((String, String), Int)] = filtered. SortBy (_.false).take(topN) println(favteacher.tobuffer)} /** ** The cache has already been calculated. // / unpersist(); // unpersist(); // unpersist();true)

    sc.stop()
  }
}
Copy the code

6 Generation of DAG

DAG(Directed Acyclic Graph) is called Directed Acyclic Graph. The original RDD is formed into DAG through a series of transformations, and DAG is divided into different stages according to the different dependencies between RDD. For narrow dependence, The partition conversion process completes the calculation in stages. For the wide dependency, due to the existence of Shuffle, the following calculation can only be started after the parent RDD processing is completed. Therefore, the wide dependency is the basis for Stage division.

Link to the article on wechat: Spark RDD

Are interested in welcome to pay attention to, we exchange learning together.