Welcome toTencent Cloud + community, get more Tencent mass technology practice dry goods oh ~
This post was posted on Cloud + Community by @edwinhzhang
FairScheduler is a commonly used yarn scheduler. However, it only refers to official documents. Many parameters and concepts are not specified in the documents, but these parameters obviously affect the normal operation of the cluster. The main purpose of this article is to clarify the functions of key parameters by combing the code. The following lists the parameters commonly used in official documents:
yarn.scheduler.fair.preemption.cluster-utilization-threshold | The utilization threshold after which preemption kicks in. The utilization is computed as the maximum ratio of usage to capacity among all resources. Defaults to 0.8f. |
---|---|
yarn.scheduler.fair.update-interval-ms | The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms. |
maxAMShare | The fraction of the queue’s fair share that can be used to run application masters. This property can only be used Queues. For example, if set to 1.0f, Then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0F will disable This feature and the amShare will not be checked. The default value is 0.5F. |
minSharePreemptionTimeout | number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. |
fairSharePreemptionTimeout | number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. |
fairSharePreemptionThreshold | If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. |
In the above parameter description, the values of timeout and other parameters do not give default values and do not tell what happens if they are not set. Concepts such as minShare and fairShare are also unclear, which can easily confuse people. A detailed explanation of these parameters and concepts is given in the following analysis.
The overall structure of FairScheduler
Fair scheduler operation process is to start FairScheduler, RM SchedulerDispatcher two services, these two services respectively in charge of the update threads, thread handle.
The update thread has two tasks :(1) update the resources of the queues (Instantaneous Fair shares), and (2) determine whether the leaf queues need to preempt resources (if preemption is enabled).
The Handle thread handles events such as adding nodes to a cluster, adding APP to a queue, deleting APP from a queue, and updating a Container.
FairScheduler class diagram
Queue inheritance module: YARN manages queues through a tree structure. From the perspective of management resources, the root node of the tree, root queue (FSParentQueue), non-root node (FSParentQueue), leaf node (FSLeaf),app task (FSAppAttempt, APP from the perspective of the fair scheduler) are all abstract resources. They both implement the Schedulable interface and are a Schedulable resource object. They all have their own fair Share methods, weight attributes, minShare attributes, maxShare attributes, The priority attribute, resourceUsage attribute, and demand attribute also implement preemptContainer to preempt resources. AssignContainer method (assigns an AM container to a ACCEPTED APP).
public interface Schedulable {
/** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */
public String getName();
/** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */
public Resource getDemand();
/** Get the aggregate amount of resources consumed by the schedulable. */
public Resource getResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
public Resource getMinShare();
/** Maximum Resource share assigned to the schedulable. */
public Resource getMaxShare();
/** Job/queue weight in fair sharing. */
public ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public long getStartTime();
/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
public Priority getPriority();
/** Refresh the Schedulable's demand and those of its children if any. */
public void updateDemand();
/** * Assign a container on this node if possible, and return the amount of * resources assigned. */
public Resource assignContainer(FSSchedulerNode node);
/** * Preempt a container from this Schedulable if possible. */
public RMContainer preemptContainer();
/** Get the fair share assigned to this Schedulable. */
public Resource getFairShare();
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare);
}
Copy the code
Queue running module: describes the working principle of fair scheduling from the perspective of class diagram. The SchedulerEventDispatcher class manages the Handle thread. The FairScheduler class manages the Update thread and retrieves all queue information through QueueManager.
We start our code analysis with the base concepts of Instantaneous Fair Share and Steady Fair Share of YARN.
Instantaneous Fair Share & Steady Fair Share
Fair Share refers to the maximum available resources that Yarn calculates based on the weight, maximum, and minimum runnable resources of each queue that can be allocated to the queue. This article describes fair scheduling. The default FairSharePolicy of fair scheduling has single-resource rules, that is, it only focuses on memory resources.
Steady Fair Share: is a fixed theoretical value of memory resources per queue. Steady Fair Share will not be easily changed after the initial work of RM, and will be recalculated only when addNodes are added later. The initial work of RM is also for the Handle thread to add each node of the cluster to the scheduler (addNode).
Instantaneous Fair Share: is the actual amount of memory resources for each queue, which changes dynamically. The orthogonal fair share in YARN is always the Instantaneous fair share of.
1 Steady Fair Share
The Handle thread calls the addNode method if it receives the NODE_ADDED event.
private synchronized void addNode(RMNode node) {
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
nodes.put(node.getNodeID(), schedulerNode);
// Add the memory of the node to the total cluster resources
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
// Update available resources
updateRootQueueMetrics();
// Update the maximum allocation of a container, which is the MAX in the UI (if I remember correctly)
updateMaximumAllocation(schedulerNode, true);
// Set steadyFailr=clusterResource total resource of the root queue
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
// recalculate SteadyShares
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterResource);
}
Copy the code
RecomputeSteadyShares uses breadth-first traversal to calculate the amount of memory resources in each queue up to the leaf node.
public void recomputeSteadyShares() {
// Breadth traverses the entire queue tree
// getSteadyFairShare is clusterResource
policy.computeSteadyShares(childQueues, getSteadyFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
if (childQueue instanceofFSParentQueue) { ((FSParentQueue) childQueue).recomputeSteadyShares(); }}}Copy the code
ComputeSteadyShares method to calculate each queue memory resources should be allocated, in general is according to the weight value of each queue to allocate, queue allocated resources more heavy, weight distribution small queue to less resources. But the actual details are affected by other factors, because each queue has two parameters, minResources and maxResources, to limit the upper and lower limits of resources. ComputeSteadyShares eventually calls the computeSharesInternal method. Here’s an example:
The figures in the figure are weights. If there are 600G total resources, parent=300G, Leaf1 =300G, Leaf2 =210G,leaf3=70G.
The computeSharesInternal method is a binary lookup that finds a resource weight R (weight-to-slots) and uses this R to allocate resources to each queue (in this method the queue type is Schedulable, again indicating that the queue is a resource object). The formula is steadyFairShare=R * QueueWeights.
ComputeSharesInternal computeSharesInternal is a method for calculating Steady Fair Share and Instantaneous Fair Share using isSteadyShare.
This is complicated because the queue does not allocate resources proportionally (proportionally, maxR and minR are not required). MaxR defaults to 0x7FFFFFFf and minR defaults to 0). If maxR and minR are set, the resources proportionally allocated are smaller than minR, then minR must be met. If the proportionally allocated resources are greater than maxR, maxR must be satisfied. So we wanted to find an R (weight-to-slots) that satisfies as much as possible:
- R* (Queue1Weights + Queue2Weights+… + QueueNWeights) < = totalResource
- R*QueueWeights >= minShare
- R*QueueWeights <= maxShare
Note: QueueNWeights are the weights of each queue, minShare and maxShare are the minResources and maxResources of each queue
ComputcomputeSharesInternal detail is divided into four steps:
- Identify available resources:totalResources = min(totalResources-takenResources(fixedShare), totalMaxShare)
- Let’s determine the upper and lower limits of R
- Binary search approximates R
- Set up fair Share using R
private static void computeSharesInternal(
Collection<? extends Schedulable> allSchedulables,
Resource totalResources, ResourceType type, boolean isSteadyShare) {
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
/ / the first step
// Remove queues with fixed resources that cannot move, and get fixed memory resources
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) {
return;
}
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
int totalMaxShare = 0;
// Iterate through the schedulables (non-fixed queue) and add the resources of each queue to get totalMaxShare
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break; }}// Total resources minus fiexd share
int totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
// The maximum resources a queue can have are limited by the total cluster resources and MaxResource per queue
totalResource = Math.min(totalMaxShare, totalResource);
// Step 2: Set the upper and lower limits of R
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}
// Step 3: the dichotomy approach to the reasonable R value
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else{ right = mid; }}// Step 4: Use the R value setting to determine the fairShar of each non-fixed queue, meaning that only active queues can share resources
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else{ setResourceValue( computeShare(sched, right, type), sched.getFairShare(), type); }}}Copy the code
(1) Identify available resources
HandleFixedFairShares method to count all fixed queue fixed memory resources (fixedShare) sum, and **fixed queue excluded shall not divide system resources. ** YARN determines fixed queues according to the following criteria:
private static int getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
// If maxShare <=0 then fixed queue, fixdShare=0
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
return 0;
}
// Calculate Instantaneous Fair Share and there is no APP running on the queue.
Fixed queue, fixdShare=0
if(! isSteadyShare && (schedinstanceofFSQueue) && ! ((FSQueue)sched).isActive()) {return 0;
}
// If the queue weight<=0, the queue is fixed
// If minShare <=0,fixdShare=0, otherwise fixdShare=minShare
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0)?0 : minShare;
}
return -1;
}
Copy the code
(2) Determine the upper and lower limits of R
The upper limit of the lower limit of R is 1.0, R is to determine by resourceUsedWithWeightToResourceRatio method. The resource value W determined by this method, R can be determined only when the available resource value T determined in the first step: W>=T.
// Calculate the resource allocation for each queue based on the R value
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection<? extends Schedulable> schedulables, ResourceType type) {
int resourcesTaken = 0;
for (Schedulable sched : schedulables) {
int share = computeShare(sched, w2rRatio, type);
resourcesTaken += share;
}
return resourcesTaken;
}
private static int computeShare(Schedulable sched, double w2rRatio,
ResourceType type) {
/ / share = R * weight, the type is memory
double share = sched.getWeights().getWeight(type) * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
}
Copy the code
(3) Binary search method approximates R
Binary lookup is terminated by satisfying one of the following two conditions:
- W == T(W and T in Step 2)
- More than 25 times (COMPUTE_FAIR_SHARES_ITERATIONS)
(4) Set fair Share using R
When setting up fair Share, you can see that the Steady fair share and the Instantaneous fair share are separated.
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else{ setResourceValue( computeShare(sched, right, type), sched.getFairShare(), type); }}Copy the code
2 Instaneous Fair Share Calculation method
This calculation method is consistent with steady fair’s calculation call stack, and computeSharesInternal method is used in the end. The only difference is that the timing of calculation is different. The Steady fair is recalculated only when addNode is used, whereas the Instantaneous fair Share is periodically updated by the Update thread.
The point to emphasize here is that we have analyzed above that if the Instantaneous Fair Share is calculated and the queue is empty, the queue is fixed, that is, inactive, so it will not divide up the cluster’s memory resources when calculating the Fair Share.
Is the update frequency and update the thread yarn. The scheduler. Fair. The update interval – ms.
private class UpdateThread extends Thread {
@Override
public void run(a) {
while(! Thread.currentThread().isInterrupted()) {try {
//yarn.scheduler.fair.update-interval-ms
Thread.sleep(updateInterval);
long start = getClock().getTime();
// Update Instantaneous Fair Share
update();
// Preempt resources
preemptTasksIfNecessary();
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
LOG.warn("Update thread interrupted. Exiting.");
return;
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e); }}}}Copy the code
3 maxAMShare meaning
If the handle thread receives the NODE_UPDATE event, it will allocate a container for the AM of the APP to be run if (1) the node’s memory resources meet the criteria, (2) the Application has ACCEPTED status. Make the APP run in the queue it’s in. But one more check of canRuunAppAM is needed before distribution. CanRuunAppAM is limited by the maxAMShare parameter.
public boolean canRunAppAM(Resource amResource) {
// The default is 0.5f
float maxAMShare =
scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName());
if (Math.abs(maxAMShare - 1.0f) < 0.0001) {
return true;
}
Instantaneous fair share =maxAMShare * fair share(Instantaneous fair share)
Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare);
//amResourceUsage is the sum of the AM resources of apps already running on this queue
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
// Check whether the current ifRunAMResource exceeds maxAMResource
return! policy .checkIfAMResourceUsageOverLimit(ifRunAMResource, maxAMResource); }Copy the code
The above code is described by the formula:
- The APP running in the queue is An, and the AM of each APP occupies R resources
- The AM size of the ACCEPTED APP is R1
- The fair share of a queue is QueFS
- Of the queuemaxAMResource=maxAMShare * QueFS
- ifRunAMResource=A1.R+A2.R+… +An.R+R1
- ifRunAMResource > maxAMResource, the queue cannot accept the APP to be run
The reason to pay attention to this parameter is that many EMR customers will report that the total resources of the cluster are not used up when using the fair queue, but there are still apps queuing and not running, as shown in the figure below:
The fair scheduling default doesn’t care about Core resources, only Memory. In the figure, 292G of Memory is used, and 53.6G of Memory is unused, so the APP can block. The reason is that the sum of AM resources of all running apps in the default queue exceeds (345.6 * 0.5), resulting in APP block.
conclusion
By analyzing the calculation process of Fair Share, you can understand the basic concepts and some parameters of YARN. From the table below, we can see that the concepts and parameters described in official documents are difficult to understand. The remaining parameters are analyzed in chapter 2, Preemption for fair scheduling.
The official description | conclusion | |
---|---|---|
Steady Fair Share | The queues are steady fair share of resources. These shares consider all The queues queues are active (have running applications) or not. These are computed less frequently and change only when the configuration or capacity changes.They are meant to provide visibility into resources the user can expect, and hence displayed in the Web UI. | A fixed theoretical value of the amount of memory resources per non-fixed queue. The Steady Fair Share will not be easily changed after the initial work of RM, and will be recalculated only when addNode is added later. The initial work of RM is also for the Handle thread to add each node of the cluster to the scheduler (addNode). |
Instantaneous Fair Share | Instantaneous fair share of resources. These shares consider only actives queues (those with running) applications), And are used for scheduling decisions. Queues may be allocated resources beyond their shares when other Queues aren’t using them. A queue whose resource consumption lies at or below its instantaneous fair share will never have its containers preempted. | The actual amount of memory in each non-fixed queue changes dynamically, with the Update thread periodically updating the fair share of the queue. The orthogonal fair share in YARN is always the Instantaneous fair share of. |
yarn.scheduler.fair.update-interval-ms | The interval at which to lock the scheduler and recalculate fair shares, recalculate demand, and check whether anything is due for preemption. Defaults to 500 ms. | Update the interval between a thread whose job is 1 to update fair Share and 2 to check whether resources need to be preempted. |
maxAMShare | The fraction of the queue’s fair share that can be used to run application masters. This property can only be used Queues. For example, if set to 1.0f, Then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0F will disable This feature and the amShare will not be checked. The default value is 0.5F. | The total AM resources of all running apps in the queue must not exceed maxAMShare * fair Share |
Question and answer
How do I upgrade YARN to a specific version?
reading
Yarn with Mesos
The Spark on Yarn | Spark, from entry to master
This section describes the three YARN modules
Machine learning in action! Quick introduction to online advertising business and CTR knowledge
This article has been authorized by the author to Tencent Cloud + community, more original text pleaseClick on the
Search concern public number “cloud plus community”, the first time to obtain technical dry goods, after concern reply 1024 send you a technical course gift package!
Massive technical practice experience, all in the cloud plus community!