This is the 13th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
A,RDD
The core concept of
RDD
是 Spark
The core data structure,RDD
(Resilient Distributed Dataset
) for elastic distributed data setsSpark
The core, and most critical, abstraction of data is essentially a distributed set of abstractionsJVM
A collection of immutable objects, immutable by definition is read-only, soRDD
The transformation produces a new oneRDD
, (as shown in the figure belowA-B
), the originalRDD
It won’t change.
Flexibility is mainly manifested in two aspects:
- In the face of an error (such as the failure of any node),
Spark
Can passRDD
Dependencies between restore arbitrary errorRDD
(e.g.,B
和D
We can figure out the final oneRDD
),RDD
Like a sponge, no matter how you squeeze it, it’s as whole as a sponge. - When processed by the conversion operator,
RDD
The number of partitions in, and the location of partitions, can change at any time.
Each RDD has the following members:
- A collection of partitions;
- A function (operator) used to calculate based on partitions;
- Depend on (and others
RDD
); - For key-value type
RDD
(optional) hash partition of - For the set of addresses used to calculate each partition (optional, e.g
HDFS
Block storage address).
// The source explains what the hell is done like: - A list of partitions - A function for computing each split - A list of dependencies on other RDDs - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)Copy the code
As shown in the following figure, RDD_0 is generated based on block addresses in HDFS. The block address set is a member variable of RDD_0. RDD_1 is converted from RDD_0 and the transform function (operator), which is actually an internal member of RDD_0. In this sense, RDD_1 depends on RDD_0, and the set of dependencies is also stored as a member variable of RDD_1.
(1)RDD
The characteristics of
Features:
- partition
- read-only
- Rely on
- The cache
checkpoint
- partition
RDD is logically partitioned. The data in each partition is abstract. A compute function is used to obtain the data in each partition. If the RDD is built from an existing file system, compute reads the data in the specified file system. If the RDD is converted from another RDD, compute performs the conversion logic to convert the data in the other RDD.
As shown in figure:
- read-only
The RDD is read-only. To change the data in the RDD, you can only create a new RDD based on the existing RDD. One RDD can be transformed into another RDD through rich operators (map, Filter, Union, Join, reduceByKey… …). Implementation, no longer can only write map and reduce like MR.
RDD operators include two types:
transformation
: used forRDD
Perform conversion, delay execution (Lazy
);action
: Used to triggerRDD
The calculation of; Get the relevant calculation results or willRDD
Saved file system;
As shown in figure:
- Rely on
RDDs were converted by operators, and the new RDD obtained by the conversion contained the information necessary for derivation from other RDDs. The RDDs maintained such blood lineage (lineage between RDDs), also known as dependency. There are two kinds of dependencies:
- Narrow depends on:
RDDs
The partitions between are one-to-one (1:1
或n:1
)- Wide dependency: child
RDD
Each partition has a parentRDD
Each partition is related to a many-to-many relationship (i.en:m
). There areshuffle
To happen.
As shown in figure:
- The cache
You can control the storage level (memory, disk, and so on) for caching. If you use the same RDD multiple times in your application, you can cache the RDD. The RDD will only get partitioned data based on the kinship when it is first computed. When it is used elsewhere, it will be fetched directly from the cache instead of being computed based on kinship. This speeds up reuse later on.
As shown in figure:
checkpoint
Although the RDD lineage is naturally fault-tolerant, when data fails or is lost for a partition of the RDD, it can be rebuilt through the lineage. However, in long iterative applications, the relationship between RDDs grows longer as iterations go on, and if errors occur in subsequent iterations, the relationship needs to be rebuilt with a very long relationship, which inevitably affects performance. RDD allows checkpoint to save data to persistent storage, thus cutting off the kinship relationship, because the RDD after checkpoint does not need to know its parent RDDs, so it can retrieve data from checkpoint.
(2)RDD
The source code
In the Spark source code, RDD is an abstract class and can be implemented differently. For example, RDD_0 can be MapPartitionRDD, while RDD_1 is ShuffledRDD because it generates Shuffle (data shuffling).
RDD source code:
// A member variable that represents the dependencies between RDD's
@transient private var deps: Seq[Dependency[_]]
// Partition member variable
@transient val partitioner: Option[Partitioner] = None
// The partition collection member variable referenced by the RDD
@transient private var partitions_ : Array[Partition] = null
// Get the dependencies between this RDD and other RDD's
protected def getDependencies: Seq[Dependency[_]] = deps
// Get the partition referenced by the RDD
protected def getPartitions: Array[Partition]
// Get each partition address
protected def getPreferredLocations(split: Partition) :Seq[String] = Nil
/ / distinct operator
def distinct(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] =
withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
Copy the code
Pay special attention to one of them:
@transient private var partitions_ : Array[Partition] = null
Copy the code
RDD is a collection of partitions and is essentially a collection, so you can understand it in terms of partitions and so on, but you can forget about it when you use it and think of it as a normal collection.
For example:
val list: List[Int] = List(1.2.3.4.5)
println(list.map(x => x + 1).filter { x => x > 1}.reduce(_ + _))
......
val list: List[Int] = spark.sparkContext.parallelize(List(1.2.3.4.5))
println(list.map(x => x + 1).filter { x => x > 1}.reduce(_ + _))
Copy the code
(3) Programming model
RDD programming model:
- create
RDD
tranformation
Operator operationaction
Operator operation- Other operating
How to createRDD
Before creating an RDD, you can classify the RDD into the following types:
- Parallel collection
- from
HDFS
Reads the - Read from an external data source
PairRDD
Suggestion: Standalone mode or local mode to learn the various operators of RDD;
HA is not required; Don’t need the IDEA
-- Local mode starts $spark-shell --masterlocal[root@linux121 ~]# spark-shell --master local
21/02/19 09:46:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://linux121:4040
Spark context available as 'sc' (master = local, app id = local-1613699181443).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_ / / ___ / __ / \ _, _ / _ / _ \ _ \ version 2.4.5 / _ / Using the Scala version 2.12.10 (Java HotSpot (TM) of 64 - Bit Server VM, Java 1.8.0_231) Type in expressions to have them Evaluated. Type :help for more information.Copy the code
Special attention should be paid to:spark-shell
Default Settings inspark context
为 sc
.
Spark context available as 'sc' (master = local, app id = local-1613699181443).
(1) Create in the collectionRDD
Create an RDD from a collection, primarily for testing purposes. Spark provides the following functions: Parallelize, makeRDD, and range
This RDD is purely for learning purposes, converting in-memory collection variables to RDD is not very practical.
Note: Do not use the RDd. collect method in the production environment, because the Driver will be OOM
val rdd1 = sc.parallelize(Array(1.2.3.4.5))
val rdd2 = sc.parallelize(1 to 100)
// Check the number of RDD partitions
rdd2.getNumPartitions
rdd2.partitions.length
rdd3 = sc.makeRDD(List(1.2.3.4.5))
val rdd4 = sc.makeRDD(1 to 100)
rdd4.getNumPartitions
val rdd5 = sc.range(1.100.3)
rdd5.getNumPartitions
val rdd6 = sc.range(1.100.2 ,10)
rdd6.getNumPartitions
// Select * from ();
scala> val rdd1 = sc.parallelize(Array(1.2.3.4.5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.getNumPartitions
res0: Int = 1
scala> rdd1.partitions.length
res1: Int = 1
scala> val rdd1 = sc.makeRDD(1 to 100)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> rdd1.getNumPartitions
res2: Int = 1
Copy the code
(2) Create it from the file systemRDD
Create the RDD using the textFile() method to load data from the file system. Method takes the URI of the file as an argument. This URI can be:
-
Local file system
Note when using the local file system: the file does not exist on all the nodes (in Standalone mode), if in local mode there is no problem.
-
Address of the distributed file system HDFS
Read from HDFS, this method of generating RDD is very common.
-
Amazon S3 address
// Load data from the local file system
val lines = sc.textFile("file:///root/data/wc.txt")
// Load data from a distributed file system
val lines = sc.textFile("hdfs://linux121:9000/user/root/data/uaction.dat")
val lines = sc.textFile("/user/root/data/uaction.dat")
val lines = sc.textFile("data/uaction.dat")
Copy the code
(3) Read from external data sources
The RDD type returned by Spark when it reads data from MySQL is JdbcRDD. As the name implies, it reads data based on JDBC. This is similar to Sqoop, but JdbcRDD must manually specify upper and lower bounds for data. That is, the maximum value of a column in the MySQL table is used as the basis for partitioning.
//val spark: SparkSession = .......
val lowerBound = 1
val upperBound = 1000
val numPartition = 10
val rdd = new JdbcRDD(spark.sparkcontext,() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db"."root"."123456")},"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
lowerBound,
upperBound,
numPartition,
r => r.getString(1))Copy the code
Since this is jDBC-based reading, this method can be used by any JDBC-enabled database, including distributed databases that support JDBC-enabled reading, but it is important to note that this method works by using multiple Executors to query non-intersecting data ranges simultaneously, as you can see from the code. So as to achieve the purpose of parallel extraction. However, the extraction performance of this method is limited by the concurrent read performance of MySQL. Simply increasing the number of executors to a certain threshold has little impact on performance.
This section describes how to read databases using JDBC. Distributed databases such as HBase are different. Data storage in distributed databases such as HBase is also based on partitioning. In this case, the performance of region-based import is much faster than that of the preceding method, which is true parallel import.
//val spark: SparkSession = .......
val sc = spark.sparkcontext
val tablename = "your_hbasetable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum"."zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort"."2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// Use the HBase API to resolve travel keys and column values
rdd_three.foreach{case (_,result) => {
val rowkey = Bytes.toString(result.getRow)
val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}
Copy the code
It is worth mentioning that HBase has a third-party component called Phoenix, which enables HBase to support SQL and JDBC. With the cooperation of this component, the first method can also be used to extract HBase data. Spark can also read HFile, the underlying HBase file, bypassing HBase data reading.
With third-party library support, Spark can read almost any data source, such as Elasticsearch
(4) fromRDD
createRDD
PairRDD is no different from other RDD except that its data type is Tuple2[K,V], which represents the key-value pair. Therefore, this RDD is also called PairRDD, and its generic type is RDD[(K,V)].
The data types of common RDD are Int and String. This data structure dictates that PairRDD can use some key-based operators, such as grouping, summarizing, and so on.
PairRDD can be obtained by ordinary RDD transformation:
//val spark: SparkSession = .......
val a = spark.sparkcontext.textFile("/user/me/wiki").map(x => (x,x))
Copy the code