1, the preface
We already know that SparkContext is prepared by sending a RegisterApplication to the Master to register the Spark application
Let’s move on to the Cluster submission process
- After the task is submitted, the Master (resource manager) finds a Worker (node) to start the Driver process, which registers the application with the Master
- The Master enables the corresponding Worker according to the resource configuration of the Submit script
- Start all executors on the Worker and the Executor reverts to the Driver for registration
- The Driver starts executing the main function, and then when executing the Action operator, it divides stages. Each stage generates a taskSet, and then distributes tasks to each Executor
Now that the source code for Step 1 is all traced, let’s move on to the Master assignment in Step 2
2. Master resource allocation 01
We know from resource layer source code analysis that the Master receives the message, will be through the receiver() method to match the processing logic, we go straight to the topic, Master. Receive () method, looking for the RegisterApplication message processing logic
Let’s briefly explain the code logic
if (state == RecoveryState.STANDBY) {
// The current Master is standby and will not be processed
// ignore, don't send response
} else {
// Encapsulate app and driver information into ApplicationInfo()
val app = createApplication(description, driver)
// Add the encapsulated app information to the waiting collection maintained by the Master
registerApplication(app)
persistenceEngine.addApplication(app)
// Send a RegisteredApplication message to the Driver
driver.send(RegisteredApplication(app.id, self))
// Allocate resources to waiting programs
schedule()
}
Copy the code
The schedule() code has been explained in the previous article, Spark source code 03-Submit submission process and Driver start process
private def schedule() :Unit = {
// Find the Worker who is still alive
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
/* * 1
// The Driver in the wait set is iterated
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
// find the right Worker
while(numWorkersVisited < numWorkersAlive && ! launched) {val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
1.3. Start the Driver
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
/* * worker-related scheduling */
startExecutorsOnWorkers()
}
Copy the code
Since we’ve already looked at Driver scheduling, let’s go straight to the startExecutorsOnWorkers() method. You can see that this method is relatively simple because the comments clearly explain what the code does
Let’s continue with the logic of the code
private def startExecutorsOnWorkers() :Unit = {
// Walk through the app to be allocated recorded in Master
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// The number of cores to allocate
if (app.coresLeft >= coresPerExecutor) {
// Filter to unavailable workers
// The number of remaining cores and memory meet the requirements, and the number of remaining cores is sorted in reverse order
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
// Verify the number of cores that need to be allocated
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Notify worker to allocate resources to executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
Copy the code
From the above code logic we need to focus on to a method scheduleExecutorsOnWorkers () is used to app allocate resources calculation, we’ll look at below scheduleExecutorsOnWorkers () method, because the method through the long, We’ll ignore the internally defined canLaunchExecutor() method for now
We know get from the screenshot, in scheduleExecutorsOnWorkers () method defines the following properties
val coresPerExecutor = app.desc.coresPerExecutor // The number of cores an Executor needs to allocate, as specified by the user
val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // The default is 1
val oneExecutorPerWorker = coresPerExecutor.isEmpty // indicates whether an Executor consumes all cores
val memoryPerExecutor = app.desc.memoryPerExecutorMB // The size of memory an Executor needs to allocate
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable)
val assignedExecutors = new Array[Int](numUsable)
If the Worker has enough cores, take the [total cores] we need; if not, take the remaining cores
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
Copy the code
There is also a three-layer loop nested code
// Filter available workers through the canLaunchExecutor() method
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) { / / the first layer
freeWorkers.foreach { pos => / / the second floor
var keepScheduling = true
// Assign for each Worker
while (keepScheduling && canLaunchExecutor(pos)) { / / the third floor
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// An Executor consumes all Worker cores
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
// An Executor consumes part of the Worker's core
assignedExecutors(pos) += 1
}
// Horizontal or vertical allocation
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
Copy the code
Let’s take a look at the internally defined canLaunchExecutor() method. We can also see from the comments what this method does: “Returns whether the specified Worker started Executor for the program”, and we also know that this method is analyzed for a Worker
As can be seen from canLaunchExecutor(), it determines whether the Worker’s memory and cores meet the requirements. Let’s explain the code logic below
def canLaunchExecutor(pos: Int) :Boolean = {
// Whether the number of cores to be allocated is greater than the minimum number of cores in the configuration
val keepScheduling = coresToAssign >= minCoresPerExecutor
// Whether the Worker minus allocated cores is greater than the minimum number of cores in the configuration
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// Whether a Worker starts only one Executor or the first assignment
vallaunchingNewExecutor = ! oneExecutorPerWorker || assignedExecutors(pos) ==0
if (launchingNewExecutor) {
// Allocated memory
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
// Is there enough memory to allocate
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
// Whether the quota has been exceeded
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
Copy the code
3, Master resource allocation 02
After reading the above code explanation, I believe many readers will be like the author in a very confused state. How does the Master partition resources, let’s use a practical example to explain the Master resource partition
Spark-submit --master spark://127.0.0.1:7077 \ --deploy-mode cluster \ --driver-memory 1g \ --class org.apache.spark.examples.SparkPi \${SPARK_HOME}/examples/jars/spark-examples.jar \
# This article focuses on the following three parameters--executor cores 4 \ # Totle-executor cores 12 \ # Totle-executor cores 4 \ # Totle-executor cores 12 \ Indirectly specifies that there are 12/4 = 3 executors --executor-memory 4g \ # Specifies that each executor occupies 4g memoryCopy the code
In the shell commit script above, we focus on the three parameters associated with Executor. Let’s map the property values to the source code above
val coresPerExecutor = 4 // The number of cores an Executor needs to allocate, as specified by the user
val minCoresPerExecutor = 4 // The default is 1
val oneExecutorPerWorker = false // indicates whether an Executor consumes all cores
val memoryPerExecutor = 4g // The size of memory an Executor needs to allocate
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable)
val assignedExecutors = new Array[Int](numUsable)
If the Worker has enough cores, take the [total cores] we need; if not, take the remaining cores
var coresToAssign = 12
Copy the code
Below, the default number of available workers is 3, and the number of cores and memory of the three workers is different
val numUsable = 3
val assignedCores = new Array[Int](numUsable)
val assignedExecutors = new Array[Int](numUsable)
Copy the code
Now let’s get the property diagram ready and use the graph to walk the Master through the process of allocating resources
We can see from the resources of the three workers that they can definitely be filtered by the canLaunchExecutor() method
Take out the first Worker(8C 12G) with subscript 0, then we return to the code nested in the three-layer loop above
// Filter available workers through the canLaunchExecutor() method
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) { / / the first layer
freeWorkers.foreach { pos => / / the second floor
var keepScheduling = true
// Assign for each Worker
while (keepScheduling && canLaunchExecutor(pos)) { / / the third floor
/ / step one
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// Step 2: An Executor consumes all Worker cores
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
// An Executor consumes part of the Worker's core
assignedExecutors(pos) += 1
}
// Step 3: Horizontal allocation or vertical allocation
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
Copy the code
After the Worker with subscript 0 has gone through the first and second steps, coresToAssign = 8, and the distribution is shown in the following figure
It can be seen from the above figure that the first round of allocation has been completed after the first and second steps, and there is a third step. Let’s explain it carefully. It can be seen that there are two judgment conditions in the third layer cycle, among which the keepScheduling can be modified in step 3
- KeepScheduling is false, the third layer cycle ends, and Worker only allocates once
- KeepScheduling is true (default), the third layer cycle continues, and the Worker keeps allocating resources until there are no more resources to allocate
Based on the above analysis, we can know whether keepScheduling controls the one-time allocation of workers in resource allocation or horizontal allocation among workers. However, it is another parameter spreadOutApps that affects the assignment of keepScheduling. This parameter can be seen in Master. The default value is true, i.e., keepScheduling is assigned false by default and workers are assigned horizontally
while (keepScheduling && canLaunchExecutor(pos)) { / / the third floor
// Step 3: Horizontal allocation or vertical allocation
if (spreadOutApps) {
keepScheduling = false}}Copy the code
Then the Worker with subscript 1 (12C 32G) is assigned coresToAssign = 4, and the result is as follows
Still true due to the judgment in canLaunchExecutor()
coresToAssign >= minCoresPerExecutor
Copy the code
The Worker subscript 2 (12C 20G) is continued to be allocated, and the result is as follows
CoresToAssign = 0 does not satisfy canLaunchExecutor(), which is the end of the Master resource allocation
4. Master resource allocation 03
As we know from Spark’s submission process, the Master will tell the Worker to start the corresponding Executor after partitioning the resource. Let’s continue tracing the source code to see if this is the case. Continue with the startExecutorsOnWorkers() method and look at the final for loop. With our good assignedCores front distribution, respectively traversal and call allocateWorkerResourceToExecutors () method
private def startExecutorsOnWorkers() :Unit = {
// Walk through the app to be allocated recorded in Master
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// The number of cores to allocate
if (app.coresLeft >= coresPerExecutor) {
// Filter to unavailable workers
// The number of remaining cores and memory meet the requirements, and the number of remaining cores is sorted in reverse order
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
// Verify the number of cores that need to be allocated
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Notify worker to allocate resources to executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
Copy the code
Here we’ll look at allocateWorkerResourceToExecutors (), from the method name we can know the Worker in the allocation of resources to the Executor
The code logic of this method is simple, so we won’t explain it too much
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo) :Unit = {
// How many executors does the Worker allocate? In the example above, (allocated cores)4 / (minimum cores in the configuration file)4 = 1
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
// Send a message to the Worker
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING}}Copy the code
Next we look at launchExecutor(). At line 755 we can see that the Master sends the launchExecutor message to the Worker to tell the Worker to start the Executor
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
Copy the code
Now that the Master allocation is complete, let’s summarize the details of the Master allocation
Summary:
- The primary method for partitioning Master resources is startExecutorsOnWorkers()
- By default, the Master divides resources according to Worker level
Executor starts the process
LaunchExecutor () : LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor: LaunchExecutor Since we are in A Windows environment, the Implementation of the Java startup process is not the same as the implementation of the Windows and Linux platforms, so we do not need to focus on the details of Executor startup. We are concerned with the operation of Executor startup
About starting Executor, We need to recall an article on the Spark source code parsing 04 – Submit the submission process and SparkContext preparation process of a property org. Apache. Spark. Executor. CoarseGrainedExecutorBackend, This is the full class name of the Executor we actually launched
Here we’ll look at CoarseGrainedExecutorBackend class, as a result of this class has associated objects, we directly from the main () method to obtain, can see the main () method is used to encapsulate the parameters, and then call the run () method
Now let’s look at the run() method
Since the code length we ignored part of the code, directly to 231 lines of code, familiar rpcEnv. SetupEndpoint () method, the endpoint CoarseGrainedExecutorBackend registered to rpcEnv environment, and a background thread invokes the onStart () method
env.rpcEnv.setupEndpoint("Executor".new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
Copy the code
If we look at the onStart() method, we can see that line 63 sends a RegisterExecutor message to the driver. This does not correspond to the submission process: The Executor is successfully started and the driver is registered in reverse
ref.ask[Boolean] (RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
Copy the code
Below we need to go back to CoarseGrainedSchedulerBackend class, view DriverEndpoint receiveAndReply () method, find RegisterExecutor message processing logic can see there are three branches, Because of the length of the code, we will briefly introduce it
if (executorDataMap.contains(executorId)) {
// Branch one: Executor is registered
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)}else if(scheduler.nodeBlacklist ! =null &&
scheduler.nodeBlacklist.contains(hostname)) {
// Branch 2: Executor has a blacklist
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)}else {
// Branch 3: Executor registration
// Driver records Executor information. executorRef.send(RegisteredExecutor)
context.reply(true)}Copy the code
Source from above we know that the Driver in the Executor, after registering successfully sent to the Executor RegisteredExecutor registration message, here we go back to CoarseGrainedExecutorBackend. The receive () method, Looking at the processing logic for the RegisteredExecutor message, you can see that there is only one simple line of code
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
Copy the code
The executor is CoarseGrainedExecutorBackend an attribute, it is an important attribute
Let’s look at Executor, focusing on the first comment line: “Spark executor, backed by a threadpool to run tasks”, run tasks by a threadpool, this is not important hint, executor has one of the most important properties is threadpool
In the Executor source 89 lines of code we found the threadPool properties, both really execute task is run in the Executor thread pool threadPool, Executor and CoarseGrainedExecutorBackend hold properties
Since there is a threadPool, there must be a method of execution. If we look at Executor line 174, launchTask(), we can see that executing a task is wrapping it up and putting it into a threadPool
Let’s graph what we know about the Master resource allocation and Executor startup process
Conclusion:
- CoarseGrainedExecutorBackend is what we call Executor role, and in its start back to Driver to register
- A executor in the CoarseGrainedExecutorBackend properties, executor of threads in thread pool is the real work
At this point we have tracked the entire process of the SparkSubmit submission