Start the
What is Starting Spark Application?
First, imagine why Spark is needed.
To count the number of lines in a file, start a JVM directly to read the file and count the number of lines.
The file is more and more big, the single effect is not ideal, so I hope to adopt a distributed way to execute, let multiple machines to collect part of the data, and then summarize the results. Distributed task execution involves complex problems such as resource scheduling, failover, and distributed information synchronization. At this time, we need a framework to help us shield these problems. By the way, we use the Spark framework to facilitate distributed computing tasks. Therefore, starting a Spark Application can be considered as simply starting a distributed task execution middleware. This Application can distribute the tasks I submit to complete the calculation.
So back to the question, how do YOU start a Spark Application?
To illustrate this, we need to introduce two more concepts: SparkSession and SparkContext.
SparkContext:A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. we can only have one active SparkContext per JVM
SparkSession: Spark 2+, which imports the API entry of the DataSet and DataFrame
Check the source code of both, it can be seen that SparkSession encapsulates SparkContext in SparkSession, and adds additional API based on DataSet and DataFrame. SparkContext is more specific, first from API level, SparkContext directly provides the underlying apis of Basic Spark concepts such as RDD, Broadcast, Accumulator, and Job. Secondly, from the perspective of class member variables, we can find that SparkContext is very rich and critical, such as _dagScheduler, _ENV, _taskScheduler, and _applicationId. As you can see from variable and method definitions, SparkContext maintains all interactions between the driver and executor (task distribution, task state, and so on), as well as interactions with the resource manager (YARN) (applying executors on physical resources), and so on. The _applicationId basically says that a SparkContext= a Spark Application.
== to-complete ==
experiment: create two session connected to one application
Copy the code
Start the
local Application
Generally used for local debugging
Spark2.2 val Spark = sparksession.builder ().appName("Spark SQL basic example")
.master("local[4]")
.getOrCreate()
Copy the code
Spark 1.6 val config = new SparkConf().setmaster ("local[5]").setAppName("test")
val sc = SparkContext.getOrCreate(config)
Copy the code
Driver SparkContext startup code
As mentioned above,SparkContext corresponds to an Application (Spark work cluster). The create process of SparkContext is the startup process of the application. (Only drivers have sparkContext, executors have SparkEnv, not sparkContext)
If you look at the SparkContext. Scala source file, you can see the SparkContext class and its associated objects
Class SparkContext: SparkContext encapsulates classes. The current open design allows only one object instance to be created in a JVM. Provide some sparkContext-specific static methods, such as getOrCrea0teCopy the code
Use SparkContext.getOrCreate(sparkConf) as the entry point to obtain the current active Context or create a new SparkContext instance through lock control.
SparkContext.class
Primary member variable
Some of the main variables are:
ListenerBus: used to transmit Spark events. For example, the web user interface (webui) listens for events such as startup and task completion to refresh data on the Web. _conf: SparkConf configuration information, which is copied from external data. _env: The Spark runtime contains all the necessary things for instance serialization, data block management, broadcast management, map Output tracker _jobProgressListener: JobProgressListener _ui: Spark UICopy the code
View the initialization code of SparkContext. The key steps are described as follows
-
SparkEnv initialization
// Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) Copy the code
SparkEnv encapsulates everything needed for the Spark Instance runtime, such as serialization, data block management, broadcast management, and Map Output Tracker.
Initialization steps
1.1 Generate an RpcEnv. The RpcEnv contains streamingManager, which can be used to manage jars and files. If instance is driver, netty Server will be started based on the port Settings in RpcEnv. As the RPC server; If the executor, = = todo = =
1.2 Generate serialization correlation, serializerManager (user can specify sequence class), closureSerializer (use JavaSerializer)
1.3 Generate boradcastManager to manage boradcast variables
1.4 Generate mapOutputTracker (driver/Executor)
1.5 shuffleManager
1.6 memoryManager
1.7 blockManager
1.8 metricsSystem
1.9 outputCommitCoordinator
-
_ui SparkUI Spark Provides the Web UI to view the running status, memory, and task distribution of tasks in a Spark cluster. This is essentially an HTTP Server based on jetty’s implementation.
_ui = if (conf.getBoolean("spark.ui.enabled".true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind()) Copy the code
-
Add jars, files
Add jars and files to env.fileserver, and then use FilerServer to provide an interface to access those files, currently via NettryStreamMananger
-
Task execution related
4.1 Reading Executor Setting Parameters and adding HearBeat processing to the DRIVER’s RPC Server
4.2 Scheduling related generation
_dagScheduler, the uppermost scheduling layer, conducts scheduling in the unit of stage. Divide each job into stages according to shuffle, form a DAG diagram, and optimize the scheduling sequence of stages. A stage contains multiple tasks, each with the same content. The number of tasks = the number of partitions. The dagScheduler submits tasks to the _taskScheduler. The _taskScheduler is responsible for receiving the stages of the DAG submission, i.e. the submitted task set, into the task Pool. Submit tasks to the lower layer (physical resource layer) according to the scheduling policy (FIFO/FAIR), shielding the concrete physical resource scheduling of the lower layer, a layer of abstraction above the physical layer. There are different implementations based on cluster resource management Settings (local, YARN, mesOS). The differences are similar (inherited from TaskSchedulerImpl). _schedulerBackend interacts with resource managers based on cluster resource management Settings (local, YARN, and MESOS). Gets the applicaitonId from _schedulerBackend and updates it to othersCopy the code
4.3 Starting the _taskScheduler, (Should the driver executor be started? Driver-executor is enabled in local mode.
4.3 start _executorAllocationManager
The Spark Application is successfully started. In Local mode, _schedulerBackend is implemented as LocalSchedulerBackend. In local mode, all processes reside in the same JVM. The driver executor has been started. The executor has its own thread pool to accept submitted tasks.
The job to start
Example:
val dataRdd = sc.parallelize(data, 3)
dataRdd.count()
Copy the code
The Dataframe encapsulates a layer on top of the RDD to view the task execution directly from the Dataframe. It is too complexCopy the code
The count function is implemented as:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
Copy the code
That is, submit the task through sparkContext and return the sum of the Utils. GetIteratorSize (Array[T]) calculated by each partition. At the heart of the topic is the runJob method
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Val results = new Array[U](partitions. Size) // runJob [T, U](RDD, func, partitions, (index,) res) => results(index) = res) results }Copy the code
Continue to
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite // Clean a closure to make it ready to serialized and send to tasks (removes unreferenced variablesin $outer's, updates REPL variables) val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n"+ rdd.todebugString)} // Submit the job to the DAG scheduler, // Data source RDD, generated closure (fun for task execution and field for access),partiton, The trigger call address of this method in driver code, resultHandler function, properties dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }Copy the code
DAGScheduler.scala
def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = {val start = system. nanoTime // Submit the job, generate the job waiter(equivalent to futer), this method submits the job information to the eventProcessLoop, To be responsible for handling by DAGSchedulerEventProcessLoop. DAGSchedulerEventProcessLoop for actual production - consumption, various DAGSchedulerEvent blockingQueue store, another thread is responsible for the continuous consumption, finally came to a dagScheduler to various asynchronous event processing. Val waiter = submitJob(RDD, func, Partitions, callSite, resultHandler, properties) If the Job Waiter is waked up, Array[T] will be processed by the driver to obtain the final result. ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match {case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
Copy the code
At this point, Spark’s outermost job execution process is complete
- The driver generates a JobSubmitted event from the job submission, waits for the job complete, and accepts the result calculated on the partition
- Summarize the results
Key points:
- DAG Eventloop is the core for driver to distribute distributed scheduling messages. All executor events (including Executor events, Job events, and Task events) are sent to DAG and processed by DAG
- Block and wake up the JobWaiter
- ResultHandler Partition Results are summarized here
It can be seen from the code that the core of task execution includes the processing of task submission and the maintenance of task state under distributed conditions, etc. The core logic lies in how DAGScheduler processes its corresponding events.
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
......
Copy the code