1. The Spark calculation depends on memory. If only 10 GB memory is available, but 500 GB files need to be sorted and output, how do I perform?

① divide the 500 gb data on the disk into 100 chunks of 5GB each. (Note, leave some system space!)Copy the code

(2) Read each 5GB data into memory in order, and use quick Sort algorithm to sort the data.

③ Put the sorted data (also 5GB) back to disk.

(4) Loop 100 times. Now, all 100 blocks are sorted. (All that’s left is how to sort them together!)

5, read 5G/100= 0.05g into memory (100Input buffers) from 100 blocks.

⑥, perform 100 merge, and the merge results are temporarily stored in 5g memory-based output buffer. When the buffer is full of 5GB, the final file is written to the hard disk and the output buffer is emptied. When any of the 100 input buffers is processed, the next 0.05 GB in the block corresponding to that buffer is written until all processing is complete.

The difference between countByValue and countByKey

First from the source perspective:

// PairRDDFunctions.scala
def countByKey() :Map[K.Long] = self.withScope {
  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

// RDD.scala
def countByValue() (implicit ord: Ordering[T] = null) :Map[T.Long] = withScope {
  map(value => (value, null)).countByKey()
} 
Copy the code

CountByValue (RDD. Scala)

  • It works on a common RDD

  • The implementation calls countByKey

CountByKey (PairRDDFunctions scala)

  • Acts on the PairRDD

  • Count the keys

  • Data must be received from the Driver. This is not applicable when the result set is large

Question:

  • Does countByKey work on a regular RDD

  • Does countByValue work on PairRDD

val rdd1: RDD[Int] = sc.makeRDD(1 to 10)
val rdd2: RDD[(Int.Int)] = sc.makeRDD((1 to 10).toList.zipWithIndex)

val result1 = rdd1.countByValue() / / can
val result2 = rdd1.countByKey() // Syntax error

val result3 = rdd2.countByValue() / / can
val result4 = rdd2.countByKey() / / can
Copy the code

3. When do two RDD joins have shuffle and when do they not have shuffle

The join operation is an important indicator to test the performance of all databases. For Spark, the join performance is Shuffle. Shuffle needs to be transmitted over disks and networks. Sometimes you can try to avoid programs that Shuffle, so when is there a Shuffle and when is there no Shuffle

3.1 Broadcast the join

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * Spark. SQL. AutoBroadcastJoinThreshold default size is 10 MB, is less than the threshold is automatically using broadcast the join.

3.2 the Bucket join

In fact, the RDD mode is similar to the table mode, except that the latter mode needs to be written to the Bucket table. The principle here is that when two RDD partitions are made in advance according to the same partitioning mode, the partition results are consistent, and then Bucket join can be performed. In addition, this join has no prior operator. You need to develop your own programming. For this kind of table join, take a look at bytedance’s core optimization practices on Spark SQL. Consider the following example

Rdd1 and RDD2 are Pair RDD

The data of RDD1 and RDD2 is the same

There must be a shuffle

Rdd1 => 5 partitions

Rdd2 => 6 partitions

Rdd1 = > = > five partitions (1, 0), (2, 0), | | (1, 0), (2, 0), | | (1, 0), (2, 0), | | (1, 0), (2, 0), (1, 0), | | (2, 0), (1, 0), (2, 0)

Rdd2 = > = > five partitions (1, 0), (2, 0), | | (1, 0), (2, 0), | | (1, 0), (2, 0), | | (1, 0), (2, 0), (1, 0), | | (2, 0), (1, 0), (2, 0)

There must be no shuffle

Rdd1 = > = > five partitions (1, 0), (1, 0), (1, 0), (1, 0), (1, 0), (3) | | (2, 0), (2, 0), (2, 0), (2, 0), (2, 0), (2, 0), (2, 0) | | | empty empty | | | null

Rdd2 = > = > five partitions (1, 0), (1, 0), (1, 0), (1, 0), (1, 0), (3) | | (2, 0), (2, 0), (2, 0), (2, 0), (2, 0), (2, 0), (2, 0) | | | empty empty | | | null

So all the operators of Shuffle, if the data is partitioned by in advance, in many cases there is no Shuffle.

In addition to the preceding two methods, Shuffle join is usually used. You can view the join principle of Spark as follows: Big Data developer-Spark Join Principle details

4.. Transform does not necessarily trigger an action

There is an exception operator, that is sortByKey. There is a sampling algorithm at the bottom of sortByKey, which is pool sampling. Finally, RangePartition needs to be carried out according to the sampling result

SortByKey → pool sampling → collect

5. How are broadcast variables designed

If a broadcast table is stored in a hive table, it is stored in a block with multiple excutors. If a broadcast table is stored in a Hive table, it is stored in a block with multiple excutors. First pull data to the driver, and then broadcast, broadcast when not all radio, is based on excutor used data in advance, take the data first, and then transmitted through the bt agreement, what is the bt agreement, is the data in a distributed peer-to-peer network, according to the network distance and the corresponding data, download is the uploader, Each task (Excutor) pulls data from the driver, which reduces pressure. In addition, in SPARK1. When it was a task level, it is now a common lock, which is shared by all tasks across Excutor.

reference

Juejin. Cn/post / 684490…

www.jianshu.com/p/6bf887bf5…

Check out your profile to follow more.