0, preface

This article is a concatenation of knowledge from the previous Spark article, from the resource layer to the computing layer

1, WorkCount

Workcount is a basic application for getting started with big data. We won’t explain it too much here

During development, do we think about the following questions:

  • Where did the first RDD data come from?
  • Is it true that data is passed from RDD to RDD in code?
  • What role do dependencies play between RDD’s?
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")

// hello world
// hello spark
val rdd: RDD[String] = sc.textFile("bigdata-spark/data/wc.txt")

// hello
// world
// hello
// spark
val word: RDD[String] = rdd.flatMap(_.split(""))

// (hello,1)
// (world,1)
// (hello,1)
// (spark,1)
val words: RDD[(String.Int)] = word.map((_, 1))

// (hello,2)
// (world,1)
// (spark,1)
val wordCount: RDD[(String.Int)] = words.reduceByKey(_ + _)

wordCount.foreach(println)
Copy the code

2, source code analysis

2.1, RDD

First, let’s look at RDD. You can see that RDD is an abstract class with two properties

  • _sc : SparkContext
  • Deps: Seq[Dependency[_]] describes the dependencies between RDD’s

Those of you who believe in Attributes Scala know that in addition to defining attributes for RDD, you also define constructors for RDD, and you need to pass in two

Further down, we can see another constructor, which has only one entry and ends up encapsulating RDD as OneToOneDependency(Dependency was covered in Spark Basics 01-RDD and wide and narrow dependencies, but I won’t go into that here). Call the constructor of the two arguments

// Two input parameters
class RDD[T: ClassTag] (var _sc: SparkContext,var deps: Seq[Dependency[_]])

// an input parameter
def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))
Copy the code

Next, we’ll look at some of the methods defined by RDD that are important for this article. With the exception of the optional getDependencies, these methods need to be implemented by subclasses themselves

// Returns an Iterator for RDD data
def compute(split: Partition, context: TaskContext) :Iterator[T]
// Return the number of partitions corresponding to RDD
protected def getPartitions: Array[Partition]
// Return the RDD dependency
protected def getDependencies: Seq[Dependency[_]] = deps
// RDD implements its own template method, which eventually calls compute() of the subclass.
final def iterator(split: Partition, context: TaskContext) :Iterator[T]
Copy the code

Let’s take a look at RDD’s own implementation of the template method iterator(). Here we see two branches. Let’s select getOrCompute() first

Entering getOrCompute(), ignoring some of the details, we can see where line 337 is and finally call the computeOrReadCheckpoint() method

Back to the computeOrReadCheckpoint() method, this code logic needs explaining

if (isCheckpointedAndMaterialized) {
  // Get the parent RDD according to Dependency
  // Then call its iterator()
  firstParent[T].iterator(split, context)
} else {
  // Call compute() of subclass RDD
  // This method is implemented by its subclasses themselves
  compute(split, context)
}
Copy the code

conclusion:
  • RDD is an abstract class with two attributes and two constructs, one for an input parameter and two for an input parameter
  • The Iterator() method of RDD is a special template method that eventually calls the compute() method of a subclass, which only needs to implement the compute() method

2.2, textFile

Going back to the code, the first thing we find is where the first RDD data came from, and what type

val rdd: RDD[String] = sc.textFile("bigdata-spark/data/wc.txt")
Copy the code

TextFile () : hadoopFile() : TextInputFormat: LongWritable: Text: MapReduce: hadoopFile() : TextInputFormat: LongWritable: Text

  1. TextInputFormat is a class that defines the type of text file to read
  2. LongWritable and Text are two data types encapsulated by MR himself

Entering the hadoopFile() method, we see that the first RDD type returned is HadoopRDD, which inherits from RDD

HadoopRDD Dependency Nil HadoopRDD Dependency Nil

class HadoopRDD[K.V] () extends RDD[(K.V(a)]sc, Nil)
Copy the code

Let’s take a look at some of the ways HadoopRDD implements RDD

2.2.1, getPartitions

See the following line of code, I do not know if you will be a little familiar with, in fact, this broken code and MR code similar, are to get fragmented data

val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
Copy the code

The implementation is in line 310 of the FileInputFormat class, which will not be explained here, but can be explored by interested readers

Summary: The number of partitions in HadoopRDD is related to the block size of the file, the number of partitions, and the number of partitions we entered

2.2.2, compute ()

Compute () : Iterator () : Iterator () : Iterator () : Iterator () : Iterator () : Iterator () : Iterator () : Iterator () : Iterator () : Iterator (

The method is too long to explain here, but at line 249 of the HadoopRDD class, we see two familiar properties

private var reader: RecordReader[K.V] = null
private val inputFormat = getInputFormat(jobConf)
Copy the code

InputFormat is used to obtain the corresponding data reader, for example: LineRecordReader Hadoop default RecordReader, through RecordReader can read the corresponding data, and return the NextIterator

Summary: Compute () for HadoopRDD does this by reading a file on the file system and converting it to an Iterator for subsequent operations

Then,
  • As can be seen from the above, HadoopRDD does not have a Dependency preceded by RDD. Since HadoopRDD belongs to the paste source, it does not need to rely on other RDD since it is close to the data source
  • HadoopRDD is a special kind of operator in RDD, which is close to the data source, and compute() can convert the data source to an Iterator for subsequent use, so we can call it the paste source RDD
  • TextFile can also be called create operator

Let’s use a graph to string together what we know

2.3, flatMap

Going back to the second line of the WorkCount code and calling the flatMap operator, we can see that the RDD type returned is MapPartitionsRDD

val word: RDD[String] = rdd.flatMap(_.split(""))

def flatMap[U: ClassTag](f: T= >TraversableOnce[U) :RDD[U] = withScope {
 Split (" "); // split(" ")
 val cleanF = sc.clean(f)
 // this: call RDD, namely HadoopRDD
 // (context, pid, iter) => iter. FlatMap (cleanF) : Defines a function with three inputs (TaskContext, Partition index, and iterator) and applies the wrapped function cleanF to iter's flatMap
 new MapPartitionsRDD[U.T] (this, (context, pid, iter) => iter.flatMap(cleanF))
}
Copy the code

Enter the MapPartitionsRDD

  • The reference to HadoopRDD passed in is used as the attribute prev of MapPartitionsRDD, which records which RDD is preceded
  • At the same time, the prev attribute is passed into the RDD(prev) and an input constructor is called. As we know from the above, a OneToOneDependency will be constructed and recorded on the RDD

2.3.1, getPartitions

As we know from the following code, MapPartitionsRDD is the number of partitions that get the pre-RDD

override def getPartitions: Array[Partition] = firstParent[T].partitions
Copy the code

Summary: MapPartitionsRDD is not the same as HadoopRDD. It has a front-rDD, so the number of partitions can be obtained directly from the front-RDD

2.3.2, compute ()

Compute () in MapPartitionsRDD is simpler, and can be obtained by calling iterator() in the preceding RDD

override def compute(split: Partition, context: TaskContext) :Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
Copy the code

Iterator () calls compute(), and MapPartitionsRDD calls iterator().

2.3.3,
  • FlatMap operator is different from textFile. FlatMap converts one RDD into another RDD, for example, HadoopRDD into MapPartitionsRDD, and flatMap is a transform operator.

Now we can supplement the RDD dependency diagram

2.4, the map

Continuing with the third line of WorkCount, using the map operator, we can see that the RDD type also returned is MapPartitionsRDD

val words: RDD[(String.Int)] = word.map((_, 1))

def map[U: ClassTag](f: T= >U) :RDD[U] = withScope {
  // Put (_, 1) in a new wrapper
  val cleanF = sc.clean(f)
  // this: RDD called
  // (context, pid, iter) => iter. Map (cleanF) : Defines a function with three inputs (TaskContext, Partition index, and iterator) and applies the wrapped function cleanF to iter's map
  new MapPartitionsRDD[U.T] (this, (context, pid, iter) => iter.map(cleanF))
}
Copy the code

Since MapPartitionsRDD has been introduced above and will not be elaborated here, the difference between Map and flatMap lies in the internal function definition

// map
(context, pid, iter) => iter.map(cleanF)
// flatMap
(context, pid, iter) => iter.flatMap(cleanF)
Copy the code
conclusion
  • Map and flatMap are the same as flatMap except for the difference in function definition of MapPartitionsRDD. They both belong to transform operator.

Now we can supplement the RDD dependency diagram

2.5, reduceByKey

Coming back to the code, the main thing is the reduceByKey operator, as you can see, a defaultPartitioner() will be called first, followed by the reduceByKey() of the two incoming parameters. Finally, it calls combineByKeyWithClassTag(), (PS: This is why there is information on the Internet that explains why reduceByKey finally calls combineByKey, actually reduceByKey is a layer of encapsulation of combineByKey)

Going into the defaultPartitioner, since we didn’t specify a partitioner, we’re going to use the HashPartitioner, the Hash partitioner by default

After looking at the divider, we continue back to combineByKeyWithClassTag and can see that the RDD type returned is ShuffledRDD, a new type of RDD

If you enter ShuffledRDD, you will notice that the first attribute prev is created by @TRANSIENT, and the second attribute is the partitioner part. ShuffledRDD also inherits RDD

class ShuffledRDD[] (Var prev transient, // disconnect serialization part)
  extends RDD[(K.C)](prev.context, Nil)
Copy the code

@TRANSIENT annotation, simple understanding is:

// a_rdd : MapPartitionsRDD
// b_rdd : ShuffledRDD
// Case 1: a_rdd <--@transient-- b_rdd. If b_rdd is serialized, a_rdd does not need to be serialized and the connection between a_rdd and b_rdd is severed

// a_rdd : MapPartitionsRDD
// b_rdd : MapPartitionsRDD
// Case 2: a_rdd <---- b_rdd. If b_rdd is serialized, a_rdd should also be serialized
Copy the code

2.5.1 and getPartitions

We can see that ShuffledRDD no longer obtains the partition number of the pre-RDD like MapPartitionsRDD, but verifies the partition number with the partition number of the partition divider

Summary: ShuffledRDD’s getPartitions method is based on the number of partitions in the partition

2.5.2, method getDependencies

ShuffledRDD also overrides the getDependencies method of RDD, as shown in the following line

List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
Copy the code

ShuffleDependency, which is not explained here, is a similar NarrowDependency (see the previous article)Spark Base 01-RDD and width dependency), we just need to know that it overrides the method

ShuffledRDD overwrites the getDependencies method and returns ShuffleDependency(ShuffleDependency, ShuffleManager).

2.5.3, compute ()

Compute () method for ShuffledRDD. I’m not going to follow the source code here because it involves shuffleManager

Those of you who are familiar with Spark must know that operators such as reduceByKey can generate shuffle. In other words, the code is divided into two stages according to this operator, with the shuffleManger in the middle as the dividing line

  • After the computation of the previous Stage, the data will be shuffled for subsequent stages, and the scattered data will involve the operation of disk flushing and file writing. In other words, the writer.write () method of ShuffleManager will be called to write the calculated data into the file
  • Subsequent stages require read operations, such as calling the reader.read () method of ShuffleManager to pull the data back to the calculation, and passing data across stages, all of which are wrapped by Spark to serve as a ShuffleManager
  • The operation of writing and reading files is transparent to users. Users do not need to care about these details. As shown in the above section, the statement that Spark is a memory-based computing framework is incorrect, because Spark actually uses operations such as writing and reading files

conclusion
  • ReduceByKey is a special transform, which can cause shuffle. The same operators are combineByKey and sortByKey, etc
  • ShuffledRDD does not need to save a Dependency relationship with the preceding RDD

3, summarize

Now we can supplement the RDD dependency diagram and supplement the missing parts

  • SparkContext: As you can see from the figure, the application level is equivalent to that of Spark

  • Stage

    • In Stage, the Iterator nesting pattern is used for data transfer between RDD (see Spark Foundation 03- Associated external Data source operation). In Stage, the Iterator nesting pattern is used for data transfer between RDD (see Spark Foundation 03- Associated external data source operation). However, multiple iterators can use this data
    • Data transfer between multiple stages is operated by ShuffleManager, one of Spark’s core services, which involves the disk flushing of files, the use of in-heap and out-heap buffers, etc. (Too much content will be analyzed by the source code article later)
  • RDD

    • HadoopRDD: A paste source RDD, especially its compute() method, which, because it is close to the data source, needs to be converted into iterators for subsequent RDD use
    • MapPartitionsRDD: a shuffle-free conversion RDD used to record the dependencies between operators
    • ShuffledRDD: A ShuffledRDD conversion RDD that generates shuffle and records shuffle relationships between operators. For shuffle, methods such as compute(), getPartitions, and getDependencies are implemented
  • Where did the first RDD data come from?

    • Compute () is implemented by the post source RDD, for example, HadoopRDD
  • Is it true that data is passed from RDD to RDD in code?

    • In stages, the so-called data transmission is the use of iterator nesting mode, and RDD is used to preserve the dependency between each RDD. From the perspective of each Stage, data transmission is more like a pipeline, and the internal pipeline realizes iterator nesting mode based on RDD
  • What role do dependencies play between RDD’s?

    • Dependency relationships are used to identify whether shuffles are generated between RDDS and can be used to divide stages