The source code of this article is based on Spark 2.2.0
The basic concept
Application
The Spark program written by a user is executed using a class with the main method to complete a calculation task. It consists of a Driver program and a set of executors that run on a Spark cluster
RDD
Elastic distributed data sets. RDD is the core data structure of Spark and can be operated by a series of operators. When RDD encounters an Action operator, it forms all previous operators into a directed acyclic graph (DAG). The Spark is converted into a Job and submitted to the cluster for execution
Use DataFrame/DateSet after spark2.x
SparkContext
SparkContext is the Spark portal. It connects the Spark cluster, creates RDD, cumulates, and broadcasts. Essentially, SparkContext is the external interface of Spark and is responsible for providing the functions of Spark to callers.
Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. There is only one SparkContext per JVM, and one server can start multiple JVMS
SparkSession
The entry point to programming Spark with the Dataset and DataFrame API. Contains SQLContext and HiveContext
Driver
The Java virtual machine process running the main method listens for the communication and connection sent by the Spark Application executor process and sends the project JAR to all executor processes The Driver works with the Master and Worker to start application processes, divide DAG, encapsulate computing tasks, allocate tasks to executors, and allocate computing resources. The Driver schedules tasks for executor execution. Therefore, it is best for drivers to communicate with Spark clusters on the same network. Driver processes are usually on worker nodes, not on the same node as Cluster Manager
Cluster Manager works on the entire SAPRK Cluster (resource allocation) and all applications, while Driver works on an application (resource coordination) and has different management layers
Worker
Standalone mode: node where the Worker process resides In YARN mode: node where the Yarn NodeManager process resides
Executor
Each Spark Application has its own executor process. Spark Application does not share one Executor process
There are executor cores and executor memory in the startup parameters. Each executor occupies CPU core and memory, and The Spark application does not reuse executors, so it is easy to cause insufficient worker resources
Executors can dynamically add and release tasks during the life cycle of spark Application. For details, see Dynamic Resource Allocation. Executors use multiple threads to run tasks assigned by SparkContext
Job
A Spark Application can be divided into multiple jobs. Each time an Action is invoked, a job is logically generated, and a job contains one or more stages.
Stage
Each job is divided into one or more stages, and each stage has a set of tasks (that is, a Taskset) assigned to an executor for execution
Stage consists of two types: ShuffleMapStage and ResultStage. If an Operator to perform Shuffle calculation is invoked in the user program, such as groupByKey, ShuffleMapStage and ResultStage are divided into ShuffleMapStage and ResultStage based on Shuffle boundaries. If no shuffle is performed, there is only one stage
TaskSet
A set of tasks that are associated but have no Shuffle dependencies on each other; Stage can be directly mapped to a TaskSet. A TaskSet encapsulates a Task that needs to be computed and has the same processing logic. These tasks can be computed in parallel, and coarse-grained scheduling is based on the TaskSet.
A stage corresponds to a taskset
Task
A driver is a unit of computation that is sent to an executor. Each task is responsible for processing a small piece of data and calculating the corresponding result at a stage. Tasks are basic units that run on a physical node. ShuffleMapTask and ResultTask correspond to an execution base unit in ShuffleMapStage and ResultStage in stages respectively. Inputsplit-task-partition has a one-to-one relationship. Spark runs a task for each partition to process it. Manually set the task number spark. Default. Parallelism
Cluster Manager
Cluster manager a component that schedules and allocates resources in a cluster for each Spark Application, such as Spark Standalone, YARN, and Mesos
Deploy Mode
Both standalone and YARN modes are standalone, client and Cluster. The difference lies in the location where the driver runs. In the client mode, the driver runs on the machine where the Spark job is submitted and can view detailed logs in real time. In cluster mode, the Spark Application is submitted to the Cluster Manager. The Cluster Manager (such as the master) starts the driver process on a node in the cluster for use in the production environment
In general, driver and worker are best in the same network, while client is likely to be arranged separately by driver workers, so network communication is time-consuming, and cluster does not have such problems
Standalone mode
Master Manages clusters consisting of master and Worker processes. Yarn cluster and HDFS are not required
Master
Standalone mode, a component of the Cluster Manager that schedules and allocates resources in the Cluster for each Spark application
Note the difference between Cluster Manager and driver
Yarn pattern
Yarn Cluster management Cluster consisting of ResourceManager and NodeManager
DAGScheduler
Build a stage-based DAG based on the Job and submit the Stage to the TaskScheduler.
TaskScheduler
Submit the Taskset to the Worker Node cluster to run and return the result.
Basic working principles of Spark
The Driver applies for resources from the Master. The Master asks the Worker to assign a specific Executor to the application. The Driver sends the assigned Task to the Executor. The Task is the business logic code of our Spark application
Do job generation,stage partitioning, and task assignment occur on the driver side? is
Spark VS MapReduce
The biggest difference between Spark and MapReduce is iterative calculation
- MapReduce A job is divided into two stages,map and Reduce. After the two stages are complete, the job is finished
- Spark can be divided into N phases, which are memory iterative
RDD
Resillient Distributed Dataset An RDD is distributed. Data is distributed on a batch of nodes. Each node stores part of the PARTITION of the RDD
When the RDD memory is insufficient, it is automatically written to disk. Calls to cache() and persist() store RDD data according to storelevel
RDD create
SparkContext.wholeTextFiles()
Returns for a large number of small files in a directory<filename,fileContent>
PairRDDSparkContext.sequenceFile[K,V]()
You can create RDD for SequenceFile. The K and V generic types are the key and value types of SequenceFile. K and V must be Hadoop serialized types, such as IntWritable and Text.SparkContext.hadoopRDD()
You can create an RDD for Hadoop’s custom input types. This method accepts the classes of JobConf, InputFormatClass, Key, and Value.SparkContext.objectFile()
Method that can be called for the previousRDD.saveAsObjectFile()
Create an object serialized file, deserialize the data in the file, and create an RDD.
Parallelize to create an RDD The parallelize() method is called to specify how many partitions to split the collection into (actually specifying the number of inputsplits, inputsplit-task-partitions), and Spark runs a task for each partition Row processing (See the Relationship between the number of nodes, NUMBER of RDD partitions, and number of CPU cores in a Spark cluster and parallelism) Spark recommends creating two to four partitions for each CPU in a cluster to avoid empty CPU loads
If multiple tasks (including Spark Hadoop tasks) are running in the cluster, is it also configured to load 2-4 computing tasks with one CPU core?
The Transformation and the Action
Transformation
Creating a new RDD transformation to an existing RDD has a lazy nature, which simply records the operations done to the RDD but does not execute them spontaneously. All transformations are executed only after the Action operation, avoiding too many intermediate results
operation | introduce |
---|---|
map | Pass each element in the RDD to a custom function, get a new element, and compose a new RDD with the new element |
filter | Each element in the RDD is judged, leaving it if it returns true and discarding it if it returns false. |
flatMap | Similar to map, map is mapped before flattening |
gropuByKey | Groups by key, and each key corresponds to an Iterable |
reduceByKey | Reduce operations are performed on the value corresponding to each key. |
sortByKey | Sort the value of each key. |
join | Join two RDD pairs that contain <key,value> pairs. Each pair on the key join is passed a custom function for processing. |
cogroup | Same as join, but the Iterable corresponding to each key is passed a custom function for processing. |
The difference between Map and flatMap Map the elements of an RDD are functionally mapped to another RDD. The flatMap operates on each element in the collection and then flattens it. Usually used to divide words
Experiment: Whether the flatMap will flatten the multi-layer nested elements. Conclusion: The flatten operation will only be performed in the lower layer, and the flatten operation will not be recursively performed in the lower layer
val arr = sc.parallelize(Array(("A".1), ("B".2), ("C".3)))
arr.flatMap(x => (x._1 + x._2)).foreach(print) //A1B2C3
val arr2 = sc.parallelize(Array(
Array(("A".1), ("B".2), ("C".3)),
Array(("C".1), ("D".2), ("E".3)),
Array(("F".1), ("G".2), ("H".3))))
arr2.flatMap(x => x).foreach(print) //(A,1)(B,2)(C,3)(C,1)(D,2)(E,3)(F,1)(G,2)(H,3)
val arr3 = sc.parallelize(Array(
Array(
Array(("A".1), ("B".2), ("C".3))),
Array(
Array(("C".1), ("D".2), ("E".3))),
Array(
Array(("F".1), ("G".2), ("H".3)))))
arr3.flatMap(x => x).foreach(print) //[Lscala.Tuple2;@11074bf8 [Lscala.Tuple2;@c10a22d [Lscala.Tuple2;@40ef42cd
Copy the code
Map and flatMap source code
def map[B](f: A= >B) :Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
// Iterate over the element directly, applying f to the element
def next() = f(self.next())
}
/** Creates a new iterator by applying a function to all values produced by this iterator * and concatenating the results. * * @return the iterator resulting from applying the given iterator-valued function * `f` to each value produced by this iterator and concatenating the results. */
def flatMap[B](f: A= >GenTraversableOnce[B) :Iterator[B] = new AbstractIterator[B] {
private var cur: Iterator[B] = empty
// This step is just an Iterator of the current element
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
while(! cur.hasNext) {if(! self.hasNext)return false
nextCur()
}
true
}
// When the next method is called, the nextCur method is eventually called
def next() :B = (if (hasNext) cur else empty).next()
}
Copy the code
join VS cogroup VS fullOuterJoin VS leftOuterJoin VS rightOuterJoin
val studentList = Array(
Tuple2(1."leo"),
Tuple2(2."jack"),
Tuple2(3."tom"));
val scoreList = Array(
Tuple2(1.100),
Tuple2(2.90),
Tuple2(2.90),
Tuple2(4.60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
/* * (4,(CompactBuffer(),CompactBuffer(60))) * (1,(CompactBuffer(leo),CompactBuffer(100))) * (3,(CompactBuffer(tom),CompactBuffer())) * (2,(CompactBuffer(jack),CompactBuffer(90, 90))) */
val studentCogroup = students.cogroup(scores) // The union key array is extended
/* * (1,(leo,100)) * (2,(jack,90)) * (2,(jack,90)) */
val studentJoin = students.join(scores) / / intersection
/* * (4,(None,Some(60))) * (1,(Some(leo),Some(100))) * (3,(Some(tom),None)) * (2,(Some(jack),Some(90))) * (2,(Some(jack),Some(90))) */
val studentFullOuterJoin = students.fullOuterJoin(scores) //some can be null union
/* * (1,(leo,Some(100))) * (3,(tom,None)) * (2,(jack,Some(90))) * (2,(jack,Some(90))) */
val studentLeftOuterJoin = students.leftOuterJoin(scores) // The left is not empty
/* * (4,(None,60)) * (1,(Some(leo),100)) * (2,(Some(jack),90)) * (2,(Some(jack),90)) */
val studentRightOuterJoin = students.rightOuterJoin(scores) // The right is not empty
Copy the code
Action
The last operation on the RDD, such as traversal,reduce, and save, starts the calculation operation, returns the value to the user program, or stores the write data to the external system, triggers the spark Job. All transformations prior to this action are triggered for Tuple2 of the key-value pair RDD, such as groupByKey, is implemented in Scala by implicitly converting it to PairRDDFunction and providing the corresponding groupByKey method. You need to manually import Spark’s implicit conversion. import org.apache.spark.SparkContext._
For groupByKey,saprk2.2 explicitly uses the HashPartitioner. Does the implicit conversion to PairRDDFunction Action necessarily return the result to the driver? Yes, see the runJob method below
The Action operation must call the runJob() method in the source code, either directly or indirectly
// Call runJob directly
def collect() :Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param resultHandler callback to pass each result to */
// The result is passed to handler function. Handle function is the method that handles the returned result
Collect handler function (iter: Iterator[T]) => iter. ToArray
def runJob[T.U: ClassTag](
rdd: RDD[T],
func: (TaskContext.Iterator[T]) = >U,
partitions: Seq[Int],
resultHandler: (Int.U) = >Unit) :Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage".false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
Copy the code
operation | introduce |
---|---|
reduce | Aggregate all the elements in the RDD. The first and second elements are aggregated, the value is aggregated with the third element, the value is aggregated with the fourth element, and so on. |
collect | Get all elements in the RDD to the local client. Pay attention to data transfer issues,spark.driver.maxResultSize You can limit the maximum number of result sets that the Action operator returns to the driver |
count | Gets the total number of RDD elements. |
take(n) | Get the first n elements in the RDD. |
saveAsTextFile | Save the RDD elements to a file, calling the toString method on each element |
countByKey | Count the value of each key. |
foreach | Iterate over each element in the RDD. |
// Create from a local file
val lines = spark.sparkContext.textFile("hello.txt")
//Transformation, returns (key,value) RDD
val linePairs = lines.map(line => (line, 1))
//Transformation, implicitly installed as PairRDDFunction, provides reduceByKey and other methods
// The source is HashPartitioner
val lineCounts = linePairs.reduceByKey(_ + _)
//Action to be sent to the driver for execution
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + lineCount._2 + " times."))
Copy the code
mapPartitions
Map: processes one data in a partition at a time mapPartitions: processes all data in a partition at a time The amount of RDD data is not very large, so it is recommended that mapPartitions operator be used instead of map operator to speed up the processing. If the amount of RDD data is very large, mapPartitions are not recommended because memory overflow may occur
val studentScoresRDD = studentNamesRDD.mapPartitions { it =>
var studentScoreList = Array("a")
while (it.hasNext) {
...
}
studentScoreList.iterator
}
Copy the code
MapPartitionsWithIndex: index with partition added
studentNamesRDD.mapPartitionsWithIndex{(index:Int,it:Iterator[String]) = >... }Copy the code
Other operator
- Sample: Indicates the transformation operation
- TakeSample: Sampling data by number,action
- Cartesian: Cartesian product
- Coalesce: Shrinks RDD partitions to compress data into fewer partitions.
Usage scenario: If data in multiple PARTITIONS is uneven (for example, filter), you can use coalesce to compress the number of PARTITIONS in an RDD to compact the data in each partition
rdd.coalesce(3)
: Compressed into three partitions
Differences between Coalesce and RePartition Repartition is a simplified version of coalesce
/** * returns a new RDD simplified to numPartitions. This results in a narrow dependency * for example: if you convert 1000 partitions to 100 partitions, shuffle will not occur, whereas if 10 partitions are converted to 100 partitions, shuffle will occur. * However, if you want to drastically merge partitions, for example, into one partition, this will result in your calculations being performed on a few cluster nodes (implication: Insufficient parallelism) * To avoid this, you can pass the second shuffle argument true, which will cause an extra shuffle during the repartition process, meaning that the upstream partitions can run in parallel. * /
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {... }/** * returns an RDD with exactly numPartitions, which can increase or decrease the parallelism of the RDD. * Internally, this will redistribute data using shuffle. If you reduce the number of partitions, consider using coalesce to avoid performing shuffle */
def repartition(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)}Copy the code
Example Change the number of RDD partitions from N to M
Partition number relation | shuffle = true | shuffle = false |
---|---|---|
N < M | The HashPartitioner function is used to repartition the data into M partitions | Coalesce is invalid, the shuffle process is not performed, and the parent RDD and child RDD have a narrow dependency relationship |
N > M | Merge several partitions of N into a new partition, eventually merging into M partitions | |
N >> M | Shuffle = true. If shuffle is added in the repartition process, the upstream partitions can run in parallel, which ensures better parallelism of operations performed before coalesce | Parent-child RDD is a narrow dependency relationship, which may cause insufficient parallelism of Spark programs in the same Stage (calculations are performed on a few cluster nodes), affecting performance |
- Returns a new RDD reduced to M partitions, which results in narrow dependencies and no shuffle
- Return a new RDD added to M partitions, shuffle occurs
- If shuff is false and N
RDD persistence
The cache () and persist ()
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist() :this.type = persist(StorageLevel.MEMORY_ONLY)
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache() :this.type = persist()
Copy the code
If you need to clear the cache from memory, you can use the unpersist() method.
class StorageLevel private(
private var _useDisk: BooleanPrivate var _useMemory:BooleanPrivate var _useOffHeap:BooleanPrivate var _deserialized:BooleanPrivate var _replication:Int = 1)// Redundant backup.The default 1.Keep only one for yourself
extends Externalizable {
Copy the code
Persistence level | meaning |
---|---|
MEMORY_ONLY | Persisted in JVM memory as unserialized Java objects. If memory cannot hold all the PARTITIONS of the RDD, the partitions that are not persisted will need to be used the next time.recalculated. |
MEMORY_AND_DISK | The same as above, but when certain partitions cannot be stored in memory, they are persisted to disks. The next time you need to use these partitions, you need to read them from the disk. |
MEMORY_ONLY_SER | Same as MEMORY_ONLY, but using Java serialization, Java objects are serialized and persisted. You can reduce memory overhead, but deserialization is required, which increases CPU overhead. |
MEMORY_AND_DSK_SER | With MEMORY_AND_DSK. However, Java objects are persisted using serialization. |
DISK_ONLY | Persisted using unserialized Java objects, stored entirely on disk. |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 etc. | If the persistence level is 2, the persistent data will be reused and saved to other nodes. In this way, data loss does not need to be calculated again and only backup data is needed. |
Priority sorting (memory first)
- MEMORY_ONLY
- MEMORY_ONLY_SER, which serializes data for storage
- DISK
Shared variables
- By default, if an external variable is used in a function of an operator, the variable is copied to each task, and each task can only operate its own copy of the variable
- Broadcast Variable A copy of the used Variable is made for each node (not each task), reducing network transmission and memory consumption
- An Accumulator allows multiple tasks to operate on an Accumulator
val rdd = sc.parallelize(Array(1.2.3.4.5))
val factorBroadcast = sc.broadcast(3)
val sumAccumulator = new DoubleAccumulator(a)//Accumulator must be registered before send to executor
sc.register(sumAccumulator)
val multipleRdd = rdd.map(num => num * factorBroadcast.value)
// Can not get the value, only from the driver side
val accumulator = rdd.map(num2 => sumAccumulator.add(num2.toDouble))
/ / action: 3,6,9,12,15
multipleRdd.foreach(num => println(num))
// To get the value, perform the action operation first
accumulator.collect() / / 15
println(sumAccumulator.value)
accumulator.count() //30, add 15 again
println(sumAccumulator.value)
Copy the code
Spark kernel Architecture
In standalone mode
A TaskScheduler submits each task in a taskSet to an executor for execution
Wide and narrow dependencies
Narrow dependency: the partition of each parent RDD is used by at most one partition of the child RDD. Wide Dependency: each parent RDD Partition Is used by partitions with multiple Child RDD
The difference between:
- Narrow dependencies allow all parent partitions to be pipelined on a cluster node. For example, map, then filter, element by element; In the case of wide dependency, all parent partition data needs to be calculated first, and Shuffle is performed between nodes, which is similar to MapReduce.
- Narrow dependency can restore failed nodes more effectively, that is, only the parent partition of the lost RDD partition needs to be recalculated, and the calculation between different nodes can be carried out in parallel. For a Lineage diagram with wide dependency, the failure of a single node may cause all ancestors of the RDD to lose some partitions, so the whole recalculation is required.
Spark Submission Mode
- Standalone mode: A master-worker cluster based on Spark
- Yarn-cluster mode based on Yarn
- Yarn-client mode based on Yarn
Set the master parameter to yarn-cluster/yarn-client in the Spark submission script. The default mode is standalone
Spark Submission script
/usr/local/spark/bin/spark-submit \ --class com.feng.spark.spark1.StructuredNetworkWordCount \ --master Standalone mode -- Nam-Executors 3 // # Allocate 3 executors --driver-memory 500 MB / -- Executor-memory 500 MB // Executors-cores 2 // # Executors-cores 2 // / usr/local/test_data/spark1-0.0.1 - the SNAPSHOT - jar - with - dependencies. Jar \Copy the code
The entire application requires 3 x 500= 1500M of memory and 3 x 2=6 cores –master local[8]: eight threads in the process to simulate cluster execution –total-executor cores: specifies the total number of CPU cores for all executors –supervise: Specifies Spark to monitor the driver node, and automatically restart the driver if the driver fails
Configuration mode
Sort from highest to lowest priority
- SparkConf: Programmatically, properties can only be set in SparkConf when running in local mode in the editor
- Spark-submit script command: The current application is valid and recommended
- Spark-defaults. conf file: global configuration
SparkConf.set("spark.default.parallelism", "100")
spark-submit: --conf spark.default.parallelism=50
spark-defaults.conf: spark.default.parallelism 10
Copy the code
In the spark-submit script, you can use –verbose to print detailed configuration property information
You can start by creating an empty SparkConf object in your program, such as
val sc = new SparkContext(new SparkConf())
Copy the code
Then set the property values with –conf in the spark-Submit script, as shown in the following
--conf spark.eventLog.enabled=false
Copy the code
Dependency management
–jars: Additional dependent jars will be automatically sent to the cluster to specify the associated JAR:
- File: Supported by the driver’s HTTP file service, all executors pull files through the driver’s HTTP service
- HDFS :/ HTTP :/ HTTPS :/ FTP: pulls data from a specified location based on the URI
- Local: files in this format must exist on every worker node, so there is no need to pull files through network IO. It is suitable for extremely large files or JAR packages, which can improve the performance of jobs
The files and jars are copied to each executor’s working directory, which takes up a large amount of disk space. Therefore, these files need to be cleaned up later. When the Spark job is run on YARN, the standalone mode is used to clean up the dependent files automatically. Need to configure the spark. The worker. The cleanup. AppDataTtl attribute, to turn on automatic cleaning rely on file and jars
See conf/spark-evnsh Parameters
Maven packages: Repositories for Maven
Yarn – cluster mode
In production mode, the driver runs on nodeManager. There is no network card traffic surge problem, but it is troublesome to view the log and debugging is not convenient
Yarn – client mode
Yarn-client is used for testing. The driver runs on a local client and schedules applications. As a result, a large amount of network traffic is generated between the driver and the YARN cluster, resulting in a surge of network adapter traffic
- In yarn-client, the driver runs on the spark-Submit submission machine, and the ApplicationMaster is just like an ExecutorLauncher, only responsible for applying and starting executor. The driver is responsible for scheduling
- In yarn-cluster,ApplicationMaster is the driver and is responsible for scheduling
Standalone Core component interaction process
See the Spark architecture for a brief analysis
Basic points:
- An Application starts a Driver
- A Driver is responsible for tracking and managing all resource states and task states during the running of the Application
- A Driver manages a set of executors
- An Executor only executes tasks belonging to one Driver
-
Orange: The Spark program is submitted by the user. The process for submitting a Spark program is as follows:
- The user spark-submit script submits a Spark program that creates a ClientEndpoint object, which communicates with the Master
- ClientEndpoint sends a RequestSubmitDriver message to the Master to submit the user program
- The Master receives a RequestSubmitDriver message and replies a SubmitDriverResponse to ClientEndpoint, indicating that the user program has been registered
The combination of 4 and 5 should indicate that the user program is registered with the master, but the driver may not be started
- ClientEndpoint sends a RequestDriverStatus message to the Master to request the Driver status
Should MasterEndPoint return a DriverStatusResponse response to DriverClient? Periodically replies. If the driver is started, 5 is generated
- If the Driver corresponding to the current user program has been started, the ClientEndpoint exits directly to complete the submission of the user program
-
Purple: Start the Driver process After the user submits the Spark program, the Driver needs to be started to process the calculation logic of the user program and complete the calculation tasks. In this case, the Master needs to start a Driver:
- Maser maintains the task Application for user submission of calculation in memory. Every time the memory structure changes, scheduling will be triggered to send LaunchDriver requests to the Worker
- The Worker receives the LaunchDriver message and starts a DriverRunner thread to execute the LaunchDriver task
- The DriverRunner thread starts a new JVM instance on the Worker that runs a Driver process that creates a SparkContext object
The current worker node is running the driver process
-
Once Dirver is started, it creates the SparkContext object, initializes the basic components required for computation, and registers the Application with the Master. The process is described as follows:
- Create SparkEnv objects and create and manage some basic components
SparkEnv Holds all the runtime environment objects for a running Spark instance (either master or worker), including the serializer, RpcEnv, block manager, map output tracker, etc. Currently Spark code finds the SparkEnv through a global variable, so all the threads can access the same SparkEnv
- Create a TaskScheduler to schedule tasks
- Create StandaloneSchedulerBackend, responsible for negotiations with ClusterManager resources
- Create a DriverEndpoint so that other components can communicate with the Driver
It’s just created. It’s not started yet
- Create a StandaloneAppClient within StandaloneSchedulerBackend, handle communications with the Master
- StandaloneAppClient creates a ClientEndpoint that actually communicates with the Master
- ClientEndpoint sends the RegisterApplication message to the Master to register the Application
- After receiving the RegisteredApplication request, the Master sends ClientEndpoint a RegisteredApplication message indicating that it has been successfully registered
-
Blue: Start the Executor process
- The Master sends the LaunchExecutor message to the Worker, requesting to start Executor. The Master also sends an ExecutorAdded message to the Driver indicating that the Master has added an Executor (not yet started).
Executor is not actually started, the master just sends a message to the worker to start executor. This step indicates that the master is responsible for starting and assigning executors, and the driver simply submits tasks to the executor
- The Worker receives the LaunchExecutor message and starts an ExecutorRunner thread to perform the LaunchExecutor tasks
- The Worker sends an ExecutorStageChanged message to the Master, notifying the Executor that the status has changed
- The Master sends an ExecutorUpdated message to the Driver, and Executor has started
This is where the master really tells the driver that the executor has started
-
Pink: Starts Task execution
- A DriverEndpoint StandaloneSchedulerBackend started
Has been created before, but didn’t start, and before the master of communication is done StandaloneSchedulerBackend
- After DriverEndpoint is started, it periodically checks the status of executors maintained by the Driver. If an idle Executor is available, tasks are scheduled
Start a driver-revive-thread background thread and periodically send ReviveOffers to yourself to check the executor status
- DriverEndpoint sends a Resource Offer request to the TaskScheduler
DriverEndpoint is CoarseGrainedSchedulerBackend inside a hold objects
- If there are resources available to start the Task, DriverEndpoint sends a LaunchTask request to Executor
- Internal CoarseGrainedExecutorBackend call Executor process of Executor threads launchTask method to start the Task
- The Executor thread maintains a thread pool internally, creates a TaskRunner thread and submits it to the pool for execution
-
Green: The Task is running successfully
- Executor process within the Executor threads to inform CoarseGrainedExecutorBackend, Task completion
- CoarseGrainedExecutorBackend send DriverEndpoint StatusUpdated news, inform the Driver run the Task status is changed
- StandaloneSchedulerBackend call TaskScheduler updateStatus method to update the Task status
StandaloneSchedulerBackend parent CoarseGrainedSchedulerBackend internal hold DriverEndpoint (inner classes), DriverEndpoint after receiving StatusUpdate information, direct call SCH eduler.statusUpdate(taskId, state, data.value)
- StandaloneSchedulerBackend continue to call TaskScheduler resourceOffers method, other tasks scheduling operation
Spark Standalone cluster Starts the master and worker separately
Run the start-all.sh script to start the master process and all worker processes to quickly start the spark standalone cluster
Start the master and worker processes respectively
Why start separately
Starting separately can be done with command line arguments, By configuring some unique parameters for the process such as the listening port number, the Web UI port number, the CPU and memory used, you can limit the Spark worker process to use less resources (CPU) on a machine running not only saprk but also Storm Core,memory) instead of all the resources on the machine
parameter | meaning | object | Use frequency |
---|---|---|---|
-h HOST, –ip HOST | On which machine is it booted, which is the default | master & worker | Not commonly used |
-p PORT, –port PORT | Which port will be used to provide external services after startup on the machine? Master is 7077 by default, and worker is random by default | master & worker | Not commonly used |
–webui-port PORT | The default port for the Web UI is 8080 for master and 8081 for worker | master & worker | Not commonly used |
-c CORES, –cores CORES | The total number of CPU cores that can be used by the Spark job. The default is all CPU cores on the current machine | worker | The commonly used |
-m MEM, –memory MEM | The total amount of memory that can be used by spark jobs is in the format of 100M or 1G. The default is 1G | worker | The commonly used |
-d DIR, –work-dir DIR | Working directory, default is SPARK_HOME/work directory | worker | The commonly used |
–properties-file FILE | The default address for loading the default configuration file for the master and worker is conf/spark-defaults.conf | master & worker | Not commonly used |
Startup sequence
Start the master first and then the worker, because after the worker starts, it needs to register with the master
1. Worker (./stop-slave.sh); 2. 2. master(./stop-master); 3. Shut down the cluster./stop-all.sh
Start the master
- use
start-master.sh
Start the - The startup log will print one line
spark://HOST:PORT
, this is the URL address of the master. The worker process will connect to the master process through this URL address and register
You can use sparksession.master () to set the master address
- Can be achieved by
http://MASTER_HOST:8080
To access the monitoring Web UI of the master cluster. On the Web UI, the URL of the master cluster is displayed
Manually start the worker process
Run the start-slave.sh < master-spark-url > command to start the worker process on the current node http://MASTER_HOST:8080web. The CPU and memory resources of the node are displayed on the UI eg:./start-slave.sh The spark: / / 192.168.0.001:8080-500 MB of memory
All spark startup and shutdown scripts
parameter | meaning |
---|---|
sbin/start-all.sh | According to the configuration, one master process and multiple worker processes are started on each node in the cluster |
sbin/stop-all.sh | Stop all master and worker processes in the cluster |
sbin/start-master.sh | Start a master process locally |
sbin/stop-master.sh | Stop the master process |
sbin/start-slaves.sh | Start all worker processes according to the worker node configured in the conf/ Slaves file |
sbin/stop-slaves.sh | Stop all worker processes |
sbin/start-slave.sh | Start a worker process locally |
The configuration file
Worker node configuration
Configure the machine as the worker node, such as hostname/ IP address, one machine is a line of configuration, all nodes copy this file by default, there is no conf/slaves file, only an empty conf/slaves. Template, at this point, Just start a master process and a worker process on the current master node. At this time, the master process and the worker process are on the same node, which is the pseudo distributed deployment CONF/Slaves file sample
spark1
spark2
spark3
Copy the code
The conf/spark – evnsh parameters
It is used to deploy spark in a cluster and configure each master and worker
And the startup script – the effect of the parameters. / start – slave. Sh spark: / / 192.168.0.001:8080-500 MB of memory, temporary modification parameter, the script command is more suitable for higher priority command line parameters, will cover the spark – evnsh parameters
parameter | meaning |
---|---|
SPARK_MASTER_IP | Specify the IP address of the machine where the master process resides |
SPARK_MASTER_PORT | Specifies the port on which master listens (default: 7077) |
SPARK_MASTER_WEBUI_PORT | Specify the port number for the Master Web UI (default: 8080) |
SPARK_MASTER_OPTS | Set additional parameters for master, using “-dx =y” to set each parameter |
SPARK_LOCAL_DIRS | The Spark working directory includes shuffle Map output files and RDD persisted to disks |
SPARK_WORKER_PORT | The port number of the worker node, which is random by default |
SPARK_WORKER_WEBUI_PORT | The web UI port number of the worker node. The default is 8081 |
SPARK_WORKER_CORES | Maximum number of cpus allowed for spark jobs on worker nodes. The default value is all CPU cores on the machine |
SPARK_WORKER_MEMORY | Maximum memory allowed for spark jobs on worker nodes. The format is 1000 MB, 2 GB, etc. The default minimum memory is 1 GB |
SPARK_WORKER_INSTANCES | Number of worker processes on the current machine, default is 1,Multiple values can be set, but be sure to set SPARK_WORKER_CORES to limit the number of cpus per worker |
SPARK_WORKER_DIR | The working directory of spark jobs, including job logs, is spark_home/work by default |
SPARK_WORKER_OPTS | Additional worker parameters, set each parameter with “-dx =y” |
SPARK_DAEMON_MEMORY | The memory allocated to the master and worker processes themselves is 1g by default |
SPARK_DAEMON_JAVA_OPTS | Set the master and worker’s own JVM parameters, using “-dx =y” to set each parameter |
SPARK_PUBLISC_DNS | The public DNS domain name of the master and worker is not available by default |
-
SPARK_MASTER_OPTS set additional parameters of the master and use – Dx = y set each parameter eg: export SPARK_MASTER_OPTS = “- Dspark. Deploy. DefaultCores = 1”
Parameter names The default value meaning spark.deploy.retainedApplications 200 Maximum number of applications can be displayed on the Spark Web UI spark.deploy.retainedDrivers 200 The maximum number of drivers can be displayed on the Spark UI spark.deploy.spreadOut true Resource scheduling strategy. SpreadOut will spread the Executor process of the application to as many workers as possible, which is suitable for HDFS file computing to improve the probability of data localization. Non-spreadout tries to assign executors to a worker, which is suitable for computationally intensive jobs spark.deploy.defaultCores infinite The maximum number of CPU cores to use in the standalone cluster per Spark job is unlimited by default and as many as you have spark.deploy.timeout 60 After a worker does not respond, the master considers that the worker has died -
SPARK_WORKER_OPTS Additional arguments for the worker
Parameter names The default value meaning spark.worker.cleanup.enabled false Whether to enable automatic clearing of worker working directories. The default value is false spark.worker.cleanup.interval 1800 The unit is second. The default value is 30 minutes spark.worker.cleanup.appDataTtl 7 times 24 times 3600 By default, how long to keep a Spark job file in the worker’s working directory. The default is 7 days
The Spark Application running
The local mode
Mainly used for native testing
/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 2 \
/usr/local/test/xxx.jar \
Copy the code
Standalone mode
Parameter Settings
Standalone mode and local difference, is to get the master set to spark: / / master_ip: port, such as spark: / / 192.168.0.103:7077
- Code:
val spark = SparkSession.builder().master("spark://IP:PORT")...
spark-submit: --master spark://IP:PORT --deploy-mode client/cluster
Default Client modespark-shell: --master spark://IP:PORT
: Used for experiments and tests/ usr/local/spark/bin/spark - submit \ -- class cn spark. Study. Core. XXX \ - master spark: / / 192.168.0.103: \ 7077 --deploy-mode client \ --num-executors 1 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 1 \ /usr/local/test/xxx.jar \Copy the code
--master
:
- Do not set the mode to local
spark://xxx
Standalone mode, which is posted to the Master process at the URLyarn-xxx
: In yarn mode, the Hadoop configuration file is read and ResourceManager is connected
Standalone Job process in Client mode
Immediately after submitting the run job, use JPS to view the process and you can see that the following process started
- SparkSubmit: Driver process, started on the local machine (spark-Submit machine)
- CoarseGrainedExecutorBackend (internal hold an Executor object, CoarseGrainedExecutorBackend Executor process) : Worker spark assignments in the implementation of the machine, assigning homework and start the process of an executor SparkSubmit CoarseGrainedExecutorBackend assigned task
Standalone cluster mode
The standalone Cluster mode monitors the driver process and automatically restarts the process when the driver fails. The standalone cluster mode is used for HA in Spark Streaming. In the Spark-Submit script, use the –supervise identifier
Hang up repeatedly to kill the driver process bin/spark – class org. Apache. Spark. Deploy. The Client kill < master url > < driver ID >, via http:// < maser Url >:8080 You can view the driver ID
Kill ApplicationYARN application -kill ApplicationID in yarn
Process:
- SparkSubmit executes briefly, registering the driver with the master. The master starts the driver and stops immediately.
- On workers, the DriverWrapper process is started
- If can apply for to CPU resources enough, in other worker, start CoarseGrainedExecutorBackend process
. --deploy-mode cluster \ --num-executors 1 \ --executor-cores 1 \ ...Copy the code
Cluster mode
- The worker starts the driver and occupies one CPU core
- The driver requests resources from the master and starts an executor process on the worker with free CPU resources
If the CPU core is too small, the executor may not start and be waiting all the time, such as when there is only one worker with one CPU core
In cluster mode, the driver is started in a process of a Worker in the cluster, and the client process will exit after completing the task of submitting the application without waiting for the completion of the application
Standalone multi-job resource scheduling
By default, each Spark job submitted attempts to use all available CPU resources in the cluster. The standalone cluster supports only FIFO scheduling for multiple jobs submitted at the same time
- Set up multiple jobs to run simultaneously
You can set thespark.cores.max
By default, it will fetch all cores from the cluster. This only makes sense if only one application is allowed to run at a time
spark.conf.set("spark.cores.max", "num")
- Submit script commands
spark-submit: --master spark://IP:PORT --conf spark.cores.max=num
spark-env.sh
Global configuration:Export SPARK_MASTER_OPTS = "- Dspark. Deploy. DefaultCores = num" default number
standalone web ui
Spark standalone mode the default port 8080 on the master machine provides the web UI, can be configured spark – env. Sh file, to configure the web UI port, address such as spark: / / 192.168.0.103:8080
Spark yarn mode should view on yarn web UI, such as http://192.168.0.103:8088/
- application web ui
The Application Detail UI is on port 4040 of the machine where the driver of the job resides
- Job level
It can be used to locate problems, for example- Uneven task data distribution: Indicates data skew
- Stage has long running time: according to the stage partition algorithm, the corresponding code of stage is located to optimize performance
- A log of each job on each executor
stdout:System.out.println
;
stderr:System.err.println
And system-level logs
After the job runs, the information disappears and you need to start the History Server
Yarn pattern
Prerequisites: In spark-env.sh, set the HADOOP_CONF_DIR or YARN_CONF_DIR attribute to hadoop configuration file directory HADOOP_HOME/etc/hadoop, which contains all hadoop and YARN configuration files. Uses such as hdFS-site and yarn-site: Spark Reads and writes HDFS and connects to yarn Resourcemanager
Two operating modes
- In yarn-client mode, the driver process runs on the machine where the job is submitted. ApplicationMaster is only responsible for applying for resources (executor) from YARN for the job. The driver is still responsible for job scheduling
- Yarn-cluster mode The driver process runs on a working node in the YARN cluster as an ApplicationMaster process
Viewing YARN Logs
Set the yarn-site.xml parameter to yarn-site.xml
- Log Aggregation (recommended)
Attribute set meaning yarn.log-aggregation-enable=true
The container logs are copied to the HDFS and deleted from the machine yarn.nodemanager.remote-app-log-dir
The HDFS directory to which logs are transferred after the application runs (valid when log aggregation is enabled) yarn.nodemanager.remote-app-log-dir-suffix
Remote log directory subdirectory name (valid when log aggregation is enabled) yarn.log-aggregation.retain-seconds
How long to save the aggregated logs in the HDFS, in seconds yarn logs -applicationId <app ID>
To view logs, you can view applicationId on the YARN WEB UI (you can also view log files in HDFS). yarn.nodemanager.log.retain-second
whenIs not enabledLog aggregation This parameter takes effect and indicates the time when a log file is saved locally, in seconds yarn.log-aggregation.retain-check-interval-seconds
How often to delete expired logs - For web UI viewing, you need to start the History Server and run the Spark History Server and MapReduce History Server. If you do not configure the Spark History Server and MapReduce History Server, you can only view the log configuration that is running. For details, see spark History Web UI
- Scattered view
The default log isYARN_APP_LOGS_DIR
Directory, for example/tmp/logs
or$HADOOP_HOME/logs/userlogs
If the History Server is not enabled in the YARN cluster, you need to view the History Serversystem.out
Log, need to be inyarn-site.xml
File Settingsyarn.log.aggregation-enable
The value is true (copy logs to HDFS)yarn logs -applicationId xxx
Check it out on the machine
Submit a script
/usr/local/spark/bin/spark-submit \
--class xxx \
#Automatically reads the Cluster Manager address from the configuration file in the Hadoop configuration directory--master yarn-cluster/yarn-client \ --num-executors 1 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 1 \ --conf <key>=<value> \#Specify different Hadoop queues and queue isolation between projects or departments--queue Hadoop queue \ /usr/local/test/xxx.jar \${1}
Copy the code
–conf: Configure all spark configuration attributes in the key=value format. –conf ”
=
” application-jar: The full pathname of the packaged Spark project JAR package on the current machine application-arguments: the arguments passed to the main method of the main class. Receive arguments passed to the shell with the ${1} placeholder; In Java, the value can be obtained by using parameters such as args[0] of the main method. When submitting the Spark application, use the./ script. Sh parameter value
Run spark job attributes in YARN mode
Properties can be set in the –conf submission script
The attribute name | The default value | meaning |
---|---|---|
spark.yarn.am.memory | 512m | Total memory used by YARN Application Master in client mode |
spark.yarn.am.cores | 1 | Number of cpus used by the Application Master in client mode |
spark.driver.cores | 1 | In cluster mode, the number of CPU cores used by the driver. The driver and the Application Master run in the same process, so the number of cpus used by the Application Master is also controlled |
spark.yarn.am.waitTime | 100s | In cluster mode, the Application Master waits for the SparkContext initialization time. In client mode, the length of time the application Master waits for the driver to connect to it |
spark.yarn.submit.file.replication | HDFS replications | The number of copies of files written to HDFS by a job, such as project JARS, dependency Jars, and configuration files, must be at least 1 |
spark.yarn.preserve.staging.files | false | If set to true, files such as the project JAR are prevented from being deleted after the job runs |
spark.yarn.scheduler.heartbeat.interval-ms | 3000 | Application Master Indicates the interval for sending heartbeat messages to Resourcemanager, in ms |
spark.yarn.scheduler.initial-allocation.interval | 200ms | Application Master Indicates the interval for sending heartbeat messages to Resourcemanager when a Pending Container needs to be allocated |
spark.yarn.max.executor.failures | Number of executors x 2, minimum 3 | The maximum number of executor failures before the entire job is judged to have failed |
spark.yarn.historyServer.address | There is no | Address of spark History Server |
spark.yarn.dist.archives | There is no | Each executor retrieves and places the Archive in the working directory |
spark.yarn.dist.files | There is no | The files in the working directory that each executor will put in |
spark.executor.instances | 2 | The default number of executors |
spark.yarn.executor.memoryOverhead | Executor memory 10% | The size of each executor’s out-of-heap memory for things like constant strings |
spark.yarn.driver.memoryOverhead | Driver memory 7% | Same as above |
spark.yarn.am.memoryOverhead | AM 7% memory | Same as above |
spark.yarn.am.port | random | Application master port |
spark.yarn.jar | There is no | Location of the Spark JAR file |
spark.yarn.access.namenodes | There is no | HDFS Namenode address that can be accessed by the Spark job |
spark.yarn.containerLauncherMaxThreads | 25 | The maximum number of threads that can be used to start Executor Containers is the Application Master |
spark.yarn.am.extraJavaOptions | There is no | Application Master JVM parameters |
spark.yarn.am.extraLibraryPath | There is no | Application Master’s additional library path |
spark.yarn.maxAppAttempts | Maximum number of attempts to submit a Spark job | |
spark.yarn.submit.waitAppCompletion | true | In cluster mode, check whether the client exits after the job is complete |
High availability solutions for master
The standalone mode dispatcher relies on the master process to make scheduling decisions, which can cause a single point of failure: if the master dies, new applications cannot be submitted. To solve this problem, Spark provides two high availability solutions: the ZooKeeper-based HA solution (recommended) and the file system-based HA solution.
HA solution based on ZooKeeper
An overview of the
Using ZooKeeper to provide leader elections and some state storage, you can start multiple master processes in the cluster and have them connect to the ZooKeeper instance. One master process is elected leader, and the other master processes are assigned standby mode. If the current leader master process fails, other standby masters are elected to restore the state of the old master.
configuration
After starting a ZooKeeper cluster, start multiple master processes on multiple nodes and give them the same ZooKeeper configuration (ZooKeeper URL and directory). The master can be dynamically added to the master cluster and removed at any time
In the spark-env.sh file, set the SPARK_DAEMON_JAVA_OPTS option:
spark.deploy.recoveryMode
: Set to ZOOKEEPER to enable standby master recovery mode (default: NONE)spark.deploy.zookeeper.url
: a zookeeper cluster urlspark.deploy.zookeeper.dir
: Directory used to store the recovery status in ZooKeeper (default/spark
)
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER - Dspark. Deploy. Zookeeper. Url = 192.168.0.103:2181192168 0.104:2181 - Dspark. Deploy. The zookeeper. Dir = / spark"Copy the code
If multiple master nodes are started in the cluster, but the master is not properly configured to use ZooKeeper, the master will fail to suspend and recover because it cannot discover other master nodes and will assume that it is the leader. This results in an unhealthy state of the cluster, as all the masters will schedule themselves.
details
In order to schedule new applications or add worker nodes to the cluster, they need to know the IP address of the current Leader master, which can be done by passing a list of masters. Can connect the master SparkSession address point to spark: / / host1: port1, host2: port2. This will cause SparkSession to try to register all the masters, and if Host1 fails, the configuration is still correct because a new leader master will be found
When an application starts, or when the worker needs to be found and registered with the current leader master. Once it is successfully registered, it is saved in ZooKeeper. If a failure occurs, the new Leader Master will contact all previously registered applications and workers and notify them of the master change. The application doesn’t even need to know about the new Master at startup.
Therefore, a new master can be created at any time, as long as new applications and workers can be found and registered with the master
Start the standby master on another node :./start-master.sh
HA scheme based on file system
An overview of the
FILESYSTEM mode: after the application and worker are registered with the master, the master will write their information to the specified FILESYSTEM directory, so as to restore the status of the registered application and worker upon restart. Manual restart is required.
configuration
In spark-env.sh, set SPARK_DAEMON_JAVA_OPTS
spark.deploy.recoveryMode
: Set FILESYSTEM to enable single point recovery (default: NONE)spark.deploy.recoveryDirectory
: Directory of the file system where spark stores status information. The directory must be accessible to the master
eg:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/usr/local/spark_recovery"
Copy the code
details
- This pattern is more suitable for development and test environments
stop-master.sh
A script that kills a master process does not clean up its recovery state. When a new master process is restarted, it goes into recovery mode. It can be restored only after all the previously registered workers and other nodes have timeout.- You can use an NFS directory (similar to HDFS) as the recovery directory. If the original master node fails, a master process can be started on another node, which will correctly restore all previously registered workers and applications. Subsequent applications can find the new master and register.
Saprk operation monitoring
Job monitoring methods :Spark Web UI, Spark History Web UI, RESTFUL API, and Metrics
Spark Web UI
After a Spark job is submitted and SparkSession is started, a Spark Web UI service is started. By default, the access address of the Spark Web UI is port 4040 of the node where the driver process resides, for example, http://
:4040
The Spark Web UI contains the following information:
- Stage and Task lists
- An overview of RDD size and memory usage
- Environmental information
- Information about the executor corresponding to the job
If multiple drivers are running on the same machine, they are automatically bound to different ports. By default, the port starts from port 4040. If the port is bound, port 4041 and port 4042 are selected, and so on.
By default, this information is valid for the duration of the job, and once the job is complete, the driver process and the corresponding Web UI service are stopped. If you want to see the Spark Web UI and details after the job is completed, you need to enable the Spark History Server.
Spark History Web UI
- Create a directory for storing logs
Directory createdhdfs://ip:port/dirName
The commandhdfs dfs -mkidr /dirName
- Modify the
spark-defaults.conf
spark.eventLog.enabled true # enable spark.eventLog.dir hdfs://ip:port/dirName spark.eventLog.compress true # compression Copy the code
- Modify the
spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://ip:port/dirName" Copy the code
Spark. The eventLog. Dir assignment event record address spark. History. The fs. LogDirectory specified directory to read operation data from which two directory to the same address
- Start the HistoryServer
./sbin/start-history-server.sh
You can view the access address of history-Server on the startup screen. Use the access address to open the History Web UI
RESTFUL API
RESTFUL apis are provided to return JSON data about logs
API | meaning |
---|---|
/applications | Get job list |
/applications/[app-id]/jobs | Specifies the job list for the job |
/applications/[app-id]/jobs/[job-id] | Specify information about the job |
/applications/[app-id]/stages | Specify the stage list for the job |
/applications/[app-id]/stages/[stage-id] | Specify all attempts lists for stages |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] | Specify information for stage Attempts |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary | Specify metrics statistics for all stage Attempt tasks |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList | Specify the task list of stage Attempts |
/applications/[app-id]/executors | Specifies the executor list for the job |
/applications/[app-id]/storage/rdd | Specifies a list of persistent RDD’s for jobs |
/applications/[app-id]/storage/rdd/[rdd-id] | Specifies information for persistent RDD |
/applications/[app-id]/logs | Downloads a compressed package of all logs for the specified job |
/applications/[app-id]/[attempt-id]/logs | Download a compressed package of all logs of an attempt for the specified job |
Eg: http://192.168.0.103:18080/api/v1/applications
Job resource scheduling
Static resource allocation
- Application parallelism: Each Spark Application runs its own batch of Executor processes to run tasks and store data. In this case, the cluster manager can schedule multiple applications at the same time
- Job parallelism: Multiple jobs can be executed in parallel within each Spark Application
Submit multiple Spark Applications at the same time
The default inter-job allocation strategy is static allocation, in which each job is given a quota of the maximum amount of resources it can use and can hold these resources during run time. This is the default used by the Spark Standalone cluster and YARN cluster.
-
Standalone cluster By default, multiple jobs submitted to the Standalone cluster are run via FIFO, with each job attempting to grab all the resources. Spark. Cores. Max: limit each job to be able to use the CPU core maximum number spark. Deploy. DefaultCores: set the default CPU core usage spark each assignment. The executor. Memory: maximum memory set each homework.
-
YARN –num-executors: How many executors can be allocated in the cluster? — Executor-memory and –executor-cores control the resources available to each executor.
No cluster Manager can provide memory sharing between multiple jobs. To share memory, you can use a single service (for example, Alluxio), so that multiple applications can access the data of the same RDD.
Dynamic resource allocation
When a resource is allocated to a job but is idle, it can be returned to the cluster Manager resource pool for use by other jobs. In Spark, dynamic resource allocation is implemented at executor granularity, Enable set when the spark. DynamicAllocation. Enabled to true, start on each node external shuffle service, and will spark. Shuffle. Service. Enabled is set to true. The purpose of the External Shuffle service is to retain shuffle files output by executors when they are removed.
Application strategy
Spark Application applies for additional executors when it has pending tasks waiting to be scheduled
Tasks submitted but awaiting scheduling -> Insufficient executors
- The driver applies for executor in a polling manner
When in a certain amount of timespark.dynamicAllocation.schedulerBacklogTimeout
A real Executor request is triggered when there is a pending task - Every once in a while
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
, if there is any pending taskOnce again,The application operation is triggered. - The number of executors applied for each round increases exponentially (e.g. 1,2,4,8…). : There are two reasons for adopting the exponential growth policy. First, if a Spark application only needs to apply for a few more actuators, it must be very careful to start resource application, which is similar to the slow start of TCP. Second, if the Spark application does need to apply for more than one actuator, it can ensure that the required computing resources grow in a timely manner.
Remove the strategy
A spark job in its executor appeared free over a certain time (spark. DynamicAllocation. ExecutorIdleTimeout), was removed.
This means that no task is pending, the executor is idle, and the request condition is mutually exclusive.
Save the intermediate state
Spark uses an external Shuffle service to store the intermediate write status of each executor. This service is a long-running process that runs on each node in the cluster. If the service is enabled, Spark Executor stores the intermediate write status of each executor during shuffle write and read. Write data to and get data from the service. This means that all shuffle data written by executors can continue to be used outside of the Executor declaration cycle.
The addition of an intermediate datastore role also changes the way executor reads and writes
In addition to writing shuffle files, executors also persist data in memory or on disk. When an executor is removed, all cached data is lost.
The shuffle service writes data that is not the same as Executor persistent data. Right? After an executor is removed or hangs, its persistent data disappears, but the data saved by the Shuffle service remains
Dynamic resource allocation in standalone mode
- Set up in front of the worker to start the spark. Shuffle. Service. Enabled to true
- application
--conf spark.dynamicAllocation.enabled=true \ Copy the code
Dynamic resource allocation in Mesos mode
- Run on each node
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
, and set thespark.shuffle.service.enabled
To true - application
--conf spark.dynamicAllocation.enabled=true \ Copy the code
Dynamic resource allocation in YARN mode
The External Shuffle Service (EXTERNAL Shuffle Service) needs to be configured to save the Shuffle write file of Executors so that the executors can be safely removed.
- Add the jar package
将$SPARK_HOME/lib
Under thespark-<version>-yarn-shuffle.jar
To join theallIn the classpath of NodeManager, that ishadoop/yarn/lib
In the directory - Modify the
yarn-site.xml
<propert> <name>yarn.nodemanager.aux-services</name> <value>spark_shuffle</value> <! -- <value>mapreduce_shuffle</value> --> </property> <propert> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> Copy the code
- Start the spark application
--conf spark.shuffle.service.enabled=true \ --conf spark.shuffle.service.port=7337 \ --conf spark.dynamicAllocation.enabled=true \ Copy the code
See Configuring the External Shuffle Service
Multiple job scheduling
A job is a computing unit triggered by a Spark action. Multiple parallel jobs can run simultaneously within a Spark job.
FIFO scheduling
By default, Spark dispatches multiple jobs in FIFO mode. Each job is divided into multiple stages, and the first job gets priority on all available resources and lets its stage task run, then the second job gets access to resources, and so on
Fair scheduling
In a fair resource sharing policy, Spark allocates resources and executes tasks of multiple jobs in a polling mode. Therefore, all jobs have a fair opportunity to use cluster resources
conf.set("spark.scheduler.mode"."FAIR")
Copy the code
--conf spark.scheduler.mode=FAIR
Copy the code
Fairly scheduling resource pools
Fair Scheduler also supports splitting jobs into groups and placing them in multiple pools, as well as setting different scheduling priorities for each pool. This feature is useful for separating important jobs from unimportant jobs. You can allocate a pool for important jobs and give them a higher priority. Allocate another pool for unimportant jobs and give them a lower priority.
In the code set sparkContext. SetLocalProperty (” spark. The scheduler. The pool “, “poolName”), all submitted the job in this thread will be into the pool, is setting up a thread to save, It is easy to use the same thread to submit all the jobs of the same user to the same resource pool. Set to null to empty the pool.
By default, each pool has the same priority on cluster resources, but within each pool, jobs are executed in FIFO mode.
You can modify the attributes of the pool through the configuration file
- SchedulingMode: FIFO/FAIR controls whether jobs in the pool queue or share resources in the pool
- Weight: controls the proportion of resources that can be allocated to a resource pool relative to other resource pools. By default, the weight of all pools is 1. If the weight of a resource pool is set to 2, the resources in this pool are twice as many as those in other pools. If the weight is set to a high value, such as 1000, Scheduling between resource pools can be implemented. – A resource pool whose weight is 1000 can always start its corresponding job immediately.
- MinShare: Minimum resource allocation (number of cpus) per resource pool. The fairness scheduler always tries to meet the minimum resource allocation for all active resource pools first, and then allocates the remaining resources based on the weight of each pool. Therefore, the minShare attribute ensures that each resource pool gets at least a certain amount of cluster resources. The default value for minShare is 0.
Configuration file by default address spark/conf/fairscheduler. XML, custom file conf. Set (” spark. The scheduler. Allocation. The file “, “/ path/to/file”)
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
Copy the code
The default schedulingMode: FIFO, weight: 1, minShare: 0 is used for resource pools that are not configured in the configuration file.
Common Spark operators
union
- The new RDD will copy the partitions of the two old RDD
- The number of partitions in the new RDD is the sum of the number of partitions in the old RDD
groupByKey
reduceByKey
-
The difference is reduceByKey, which adds a MapPartitionsRDD in the middle, which is the RDD after local data aggregation, which can reduce network data transmission.
-
The process for read and aggregate is similar to that for groupByKey. ShuffledRDD does shuffle Read reaggregation to get the final RDD
distinct
- Convert each raw value to a tuple
- Local aggregation (similar to reduceByKey)
- Finally, the tuple is converted back to a single value
cogroup
The cogroup operator is the basis for other operators, such as join,intersection
(hello,[(1,1),(1,1)]): (hello,[(1,1),(1,1)]): (Hello,[(1,1),(1,1)]): (Hello,[(1,1),(1,1)]): (Hello,[(1,1),(1,1)])
intersection
Filter: Filters out keys that are empty in either of the two sets
join
- Cogroup, aggregate two RDD keys
- The flatMap, for each piece of aggregated data, may return multiple pieces of data by taking all the elements of the two sets corresponding to each key and doing a Cartesian product
sortByKey
- Shuffle ShuffledRDD, do read, pull the same key to a http://ozijnir4t.bkt.clouddn.com/spark/learning/sortByKey.pngpartition
- MapPartitions, which globally sorts the keys within each PARTITIONS
cartesian
Cartesian product
coalesce
It is used to reduce the number of partitions
repartition
Repartition operator = coalesce (true)
During the repartition operation, prefixes are calculated for values as keys in the implicit RDD generated in the middle. During the Shuffle operation, tuples corresponding to certain key values are added to each partition to complete the repartition operation
knowledge
The relationship between parallelism and the number of nodes, RDD partitions, and CPU cores in a Spark cluster
- Each file contains multiple blocks
- When Spark reads input files, it parses them according to the InputFormat of the data format. Generally, several blocks are merged into an InputSplit, called InputSplit. InputSplit cannot span files
- An InputSplit generates a task
- Each Executor consists of several cores, and each Executor core can execute only one Task at a time
- Each Task execution produces a partiton of the target RDD
If the number of partitions is large and the number of resources that can handle instances is large, the natural concurrency is large. If the number of partitions is small and the number of resources is large, the number of tasks is small and the number of concurrent tasks is small. If the number of partitions is large and the number of resources is small (such as core), the concurrency is small. I count one batch and move on to the next
Concurrency of tasks executed = Number of Executors x Number of Executor cores per TaskCopy the code
The core here is the virtual core and not the physical CPU core of the machine, so you can think of it as a worker thread for Executor, right? The number of cores per executor is set using the spark.executor. Cores parameter. Cores refer to worker threads. CPU info indicates the number of physical cores (or the number of physical cores x 2 when hyperthreading is enabled), which is different from the number of cores in Spark. However, the number of Executor cores configured for spark jobs should not exceed the number of physical cores on the machine.
The number of the partition
- Data reading phase, such as
sc.textFile
, how many inputsplits the input file is divided into will require how many initial tasks - The number of PARTITIONS in the Map phase remains unchanged
- In the Reduce phase, the aggregation of RDD triggers the shuffle operation. The number of partitions in the aggregated RDD depends on the operation. For example, the repartition operation aggregates the number of partitions and some operators are configurable
reference
- Explore the Shuffle implementation of Spark in detail
- Spark Performance Tuning: Resource Tuning section
- In a Spark cluster, parallelism is related to the number of nodes, RDD partitions, and CPU cores in the cluster
- Spark Note – Repartition and coalesce
- Description of Spark 2.0 series SparkSession
- Set YARN log aggregation parameters
- In-depth understanding of Spark 2.1 Core (1) : RDD principle and source code analysis
- Spark 2.0 goes from beginner to master
- The spark 2.2.0 document