This is the 13th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

A,RDDThe core concept of

RDDSparkThe core data structure,RDD(Resilient Distributed Dataset) for elastic distributed data setsSparkThe core, and most critical, abstraction of data is essentially a distributed set of abstractionsJVMA collection of immutable objects, immutable by definition is read-only, soRDDThe transformation produces a new oneRDD, (as shown in the figure belowA-B), the originalRDDIt won’t change.

Flexibility is mainly manifested in two aspects:

  • In the face of an error (such as the failure of any node),SparkCan passRDDDependencies between restore arbitrary errorRDD(e.g.,BDWe can figure out the final oneRDD),RDDLike a sponge, no matter how you squeeze it, it’s as whole as a sponge.
  • When processed by the conversion operator,RDDThe number of partitions in, and the location of partitions, can change at any time.

Each RDD has the following members:

  • A collection of partitions;
  • A function (operator) used to calculate based on partitions;
  • Depend on (and othersRDD);
  • For key-value typeRDD(optional) hash partition of
  • For the set of addresses used to calculate each partition (optional, e.gHDFSBlock storage address).
// The source explains what the hell is done like: - A list of partitions - A function for computing each split - A list of dependencies on other RDDs - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)Copy the code

As shown in the following figure, RDD_0 is generated based on block addresses in HDFS. The block address set is a member variable of RDD_0. RDD_1 is converted from RDD_0 and the transform function (operator), which is actually an internal member of RDD_0. In this sense, RDD_1 depends on RDD_0, and the set of dependencies is also stored as a member variable of RDD_1.

(1)RDDThe characteristics of

Features:

  • partition
  • read-only
  • Rely on
  • The cache
  • checkpoint
  1. partition

RDD is logically partitioned. The data in each partition is abstract. A compute function is used to obtain the data in each partition. If the RDD is built from an existing file system, compute reads the data in the specified file system. If the RDD is converted from another RDD, compute performs the conversion logic to convert the data in the other RDD.

As shown in figure:

  1. read-only

The RDD is read-only. To change the data in the RDD, you can only create a new RDD based on the existing RDD. One RDD can be transformed into another RDD through rich operators (map, Filter, Union, Join, reduceByKey… …). Implementation, no longer can only write map and reduce like MR.

RDD operators include two types:

  • transformation: used forRDDPerform conversion, delay execution (Lazy);
  • action: Used to triggerRDDThe calculation of; Get the relevant calculation results or willRDDSaved file system;

As shown in figure:

  1. Rely on

RDDs were converted by operators, and the new RDD obtained by the conversion contained the information necessary for derivation from other RDDs. The RDDs maintained such blood lineage (lineage between RDDs), also known as dependency. There are two kinds of dependencies:

  • Narrow depends on:RDDsThe partitions between are one-to-one (1:1n:1)
  • Wide dependency: childRDDEach partition has a parentRDDEach partition is related to a many-to-many relationship (i.en:m). There areshuffleTo happen.

As shown in figure:

  1. The cache

You can control the storage level (memory, disk, and so on) for caching. If you use the same RDD multiple times in your application, you can cache the RDD. The RDD will only get partitioned data based on the kinship when it is first computed. When it is used elsewhere, it will be fetched directly from the cache instead of being computed based on kinship. This speeds up reuse later on.

As shown in figure:

  1. checkpoint

Although the RDD lineage is naturally fault-tolerant, when data fails or is lost for a partition of the RDD, it can be rebuilt through the lineage. However, in long iterative applications, the relationship between RDDs grows longer as iterations go on, and if errors occur in subsequent iterations, the relationship needs to be rebuilt with a very long relationship, which inevitably affects performance. RDD allows checkpoint to save data to persistent storage, thus cutting off the kinship relationship, because the RDD after checkpoint does not need to know its parent RDDs, so it can retrieve data from checkpoint.

(2)RDDThe source code

In the Spark source code, RDD is an abstract class and can be implemented differently. For example, RDD_0 can be MapPartitionRDD, while RDD_1 is ShuffledRDD because it generates Shuffle (data shuffling).

RDD source code:

// A member variable that represents the dependencies between RDD's
@transient private var deps: Seq[Dependency[_]]

// Partition member variable
@transient val partitioner: Option[Partitioner] = None

// The partition collection member variable referenced by the RDD
@transient private var partitions_ : Array[Partition] = null

// Get the dependencies between this RDD and other RDD's
protected def getDependencies: Seq[Dependency[_]] = deps

// Get the partition referenced by the RDD
protected def getPartitions: Array[Partition]

// Get each partition address
protected def getPreferredLocations(split: Partition) :Seq[String] = Nil

/ / distinct operator
def distinct(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] = 

withScope  {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
Copy the code

Pay special attention to one of them:

@transient private var partitions_ : Array[Partition] = null
Copy the code

RDD is a collection of partitions and is essentially a collection, so you can understand it in terms of partitions and so on, but you can forget about it when you use it and think of it as a normal collection.

For example:

val list: List[Int] = List(1.2.3.4.5)
println(list.map(x => x + 1).filter { x => x > 1}.reduce(_ + _))

......

val list: List[Int] = spark.sparkContext.parallelize(List(1.2.3.4.5))
println(list.map(x => x + 1).filter { x => x > 1}.reduce(_ + _))
Copy the code

(3) Programming model

RDD programming model:

  1. createRDD
  2. tranformationOperator operation
  3. actionOperator operation
  4. Other operating

How to createRDD

Before creating an RDD, you can classify the RDD into the following types:

  • Parallel collection
  • fromHDFSReads the
  • Read from an external data source
  • PairRDD

Suggestion: Standalone mode or local mode to learn the various operators of RDD;

HA is not required; Don’t need the IDEA

-- Local mode starts $spark-shell --masterlocal[root@linux121 ~]# spark-shell --master local
21/02/19 09:46:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://linux121:4040
Spark context available as 'sc' (master = local, app id = local-1613699181443).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_ / / ___ / __ / \ _, _ / _ / _ \ _ \ version 2.4.5 / _ / Using the Scala version 2.12.10 (Java HotSpot (TM) of 64 - Bit Server VM, Java 1.8.0_231) Type in expressions to have them Evaluated. Type :help for more information.Copy the code

Special attention should be paid to:spark-shellDefault Settings inspark contextsc.

Spark context available as 'sc' (master = local, app id = local-1613699181443).

(1) Create in the collectionRDD

Create an RDD from a collection, primarily for testing purposes. Spark provides the following functions: Parallelize, makeRDD, and range

This RDD is purely for learning purposes, converting in-memory collection variables to RDD is not very practical.

Note: Do not use the RDd. collect method in the production environment, because the Driver will be OOM

val rdd1 = sc.parallelize(Array(1.2.3.4.5))
val rdd2 = sc.parallelize(1 to 100)
// Check the number of RDD partitions
rdd2.getNumPartitions
rdd2.partitions.length
rdd3 = sc.makeRDD(List(1.2.3.4.5))
val rdd4 = sc.makeRDD(1 to 100)
rdd4.getNumPartitions
val rdd5 = sc.range(1.100.3)
rdd5.getNumPartitions
val rdd6 = sc.range(1.100.2 ,10)
rdd6.getNumPartitions



// Select * from ();
scala> val rdd1 = sc.parallelize(Array(1.2.3.4.5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.getNumPartitions
res0: Int = 1

scala> rdd1.partitions.length
res1: Int = 1

scala> val rdd1 = sc.makeRDD(1 to 100)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd1.getNumPartitions
res2: Int = 1
Copy the code

(2) Create it from the file systemRDD

Create the RDD using the textFile() method to load data from the file system. Method takes the URI of the file as an argument. This URI can be:

  1. Local file system

    Note when using the local file system: the file does not exist on all the nodes (in Standalone mode), if in local mode there is no problem.

  2. Address of the distributed file system HDFS

    Read from HDFS, this method of generating RDD is very common.

  3. Amazon S3 address

// Load data from the local file system
val lines = sc.textFile("file:///root/data/wc.txt")

// Load data from a distributed file system
val lines = sc.textFile("hdfs://linux121:9000/user/root/data/uaction.dat")
val lines = sc.textFile("/user/root/data/uaction.dat")
val lines = sc.textFile("data/uaction.dat")
Copy the code

(3) Read from external data sources

The RDD type returned by Spark when it reads data from MySQL is JdbcRDD. As the name implies, it reads data based on JDBC. This is similar to Sqoop, but JdbcRDD must manually specify upper and lower bounds for data. That is, the maximum value of a column in the MySQL table is used as the basis for partitioning.

//val spark: SparkSession = .......

val lowerBound = 1
val upperBound = 1000
val numPartition = 10

val rdd = new JdbcRDD(spark.sparkcontext,() => {
       Class.forName("com.mysql.jdbc.Driver").newInstance()
       DriverManager.getConnection("jdbc:mysql://localhost:3306/db"."root"."123456")},"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
   lowerBound, 
   upperBound, 
   numPartition,
   r => r.getString(1))Copy the code

Since this is jDBC-based reading, this method can be used by any JDBC-enabled database, including distributed databases that support JDBC-enabled reading, but it is important to note that this method works by using multiple Executors to query non-intersecting data ranges simultaneously, as you can see from the code. So as to achieve the purpose of parallel extraction. However, the extraction performance of this method is limited by the concurrent read performance of MySQL. Simply increasing the number of executors to a certain threshold has little impact on performance.

This section describes how to read databases using JDBC. Distributed databases such as HBase are different. Data storage in distributed databases such as HBase is also based on partitioning. In this case, the performance of region-based import is much faster than that of the preceding method, which is true parallel import.

//val spark: SparkSession = .......

val sc = spark.sparkcontext
val tablename = "your_hbasetable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum"."zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort"."2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]) 

// Use the HBase API to resolve travel keys and column values
rdd_three.foreach{case (_,result) => {

    val rowkey = Bytes.toString(result.getRow)
    val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}
Copy the code

It is worth mentioning that HBase has a third-party component called Phoenix, which enables HBase to support SQL and JDBC. With the cooperation of this component, the first method can also be used to extract HBase data. Spark can also read HFile, the underlying HBase file, bypassing HBase data reading.

With third-party library support, Spark can read almost any data source, such as Elasticsearch

(4) fromRDDcreateRDD

PairRDD is no different from other RDD except that its data type is Tuple2[K,V], which represents the key-value pair. Therefore, this RDD is also called PairRDD, and its generic type is RDD[(K,V)].

The data types of common RDD are Int and String. This data structure dictates that PairRDD can use some key-based operators, such as grouping, summarizing, and so on.

PairRDD can be obtained by ordinary RDD transformation:

//val spark: SparkSession = .......

val a = spark.sparkcontext.textFile("/user/me/wiki").map(x => (x,x))
Copy the code