sequence
This article focuses on mkAssignments for Storm Nimbus
Nimbus.mkAssignments
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java
void doRebalance(String topoId, StormBase stormBase) throws Exception {
RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options();
StormBase updated = new StormBase();
updated.set_topology_action_options(null);
updated.set_component_debug(Collections.emptyMap());
if (rbo.is_set_num_executors()) {
updated.set_component_executors(rbo.get_num_executors());
}
if (rbo.is_set_num_workers()) {
updated.set_num_workers(rbo.get_num_workers());
}
stormClusterState.updateStorm(topoId, updated);
updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal()));
idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it recompute.
mkAssignments(topoId);
}
private void mkAssignments() throws Exception {
mkAssignments(null);
}
Copy the code
- Here call stormClusterState. UpdateStorm (topoId, updated) update zk data
- Here doRebalance and mkAssignments call mkAssignments(String scratchTopoId)
mkAssignments(String scratchTopoId)
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java
private void mkAssignments(String scratchTopoId) throws Exception {
try {
if(! isReadyForMKAssignments()) {return;
}
// get existing assignment (just the topologyToExecutorToNodePort map) -> default to {}
// filter out ones which have a executor timeout
// figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors
// should be in each slot (e.g., 4, 4, 4, 5)
// only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
// edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be
// reassigned to. worst comes to worse the executor will timeout and won't assign here next time around IStormClusterState state = stormClusterState; //read all the topologies Map
bases; Map
tds = new HashMap<>(); synchronized (submitLock) { // should promote: only fetch storm bases of topologies that need scheduling. bases = state.topologyBases(); for (Iterator
> it = bases.entrySet().iterator(); it.hasNext(); ) { Entry
entry = it.next(); String id = entry.getKey(); try { tds.put(id, readTopologyDetails(id, entry.getValue())); } catch (KeyNotFoundException e) { //A race happened and it is probably not running it.remove(); } } } List
assignedTopologyIds = state.assignments(null); Map
existingAssignments = new HashMap<>(); for (String id : assignedTopologyIds) { //for the topology which wants rebalance (specified by the scratchTopoId) // we exclude its assignment, meaning that all the slots occupied by its assignment // will be treated as free slot in the scheduler code. if (! id.equals(scratchTopoId)) { Assignment currentAssignment = state.assignmentInfo(id, null); if (! currentAssignment.is_set_owner()) { TopologyDetails td = tds.get(id); if (td ! = null) { currentAssignment.set_owner(td.getTopologySubmitter()); state.setAssignment(id, currentAssignment, td.getConf()); } } existingAssignments.put(id, currentAssignment); } } // make the new assignments for topologies lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds); } catch (Exception e) { this.mkAssignmentsErrors.mark(); throw e; }}
,>
,>
,>
,> Copy the code
- Read the TopologyDetails information by readTopologyDetails
- Assignments (Runnable Callback) and assignments. AssignmentInfo (String stormId, Runnable callback) take existing assignments
- Then call lockingMkAssignments to assign new Assignments to topologies
lockingMkAssignments
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java
private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases,
String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state,
Map<String, TopologyDetails> tds) throws Exception {
Topologies topologies = new Topologies(tds);
synchronized (schedLock) { Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state); //construct the final Assignments by adding start-times etc into it Map<String, Assignment> newAssignments = new HashMap<>(); / /... //tasks figure out what tasks to talk to by looking at topology at runtime // onlylog/set when there's been a change to the assignment for (Entry
entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); TopologyDetails td = topologies.getById(topoId); if (assignment.equals(existingAssignment)) { LOG.debug("Assignment for {} hasn'
,>t changed", topoId); } else { LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment); state.setAssignment(topoId, assignment, td.getConf()); } } //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment //because the number of existing assignments is small for every scheduling round, //we expect to notify supervisors at almost the same time Map
totalAssignmentsChangedNodes = new HashMap<>(); for (Entry
entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment)); } notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes, basicSupervisorDetailsMap); Map
> addedSlots = new HashMap<>(); for (Entry
entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); if (existingAssignment == null) { existingAssignment = new Assignment(); existingAssignment.set_executor_node_port(new HashMap<>()); existingAssignment.set_executor_start_time_secs(new HashMap<>()); } Set
newSlots = newlyAddedSlots(existingAssignment, assignment); addedSlots.put(topoId, newSlots); } inimbus.assignSlots(topologies, addedSlots); }}
,>
,>
,>
,>Copy the code
- This first call computeNewSchedulerAssignments newSchedulerAssignments method calculation
- NewAssignments is traversed, and if assignment changes, state.setAssignment(topoId, Assignment, td.getconf ()) is called to write assignment information to ZK
- And then calculate totalAssignmentsChangedNodes call notifySupervisorsAssignments, with the aid of AssignmentDistributionService notification to the supervisors
computeNewSchedulerAssignments
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java
private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments, Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException { Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases); Set<String> zkHeartbeatTopologies = topologies.getTopologies().stream() .filter(topo -> ! supportRpcHeartbeat(topo)) .map(TopologyDetails::getId) .collect(Collectors.toSet()); updateAllHeartbeats(existingAssignments, topoToExec, zkHeartbeatTopologies); Map<String, Set<List<Integer>>> topoToAliveExecutors = computeTopologyToAliveExecutors(existingAssignments, topologies, topoToExec, scratchTopologyId); Map<String, Set<Long>> supervisorToDeadPorts = computeSupervisorToDeadPorts(existingAssignments, topoToExec, topoToAliveExecutors); Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = computeTopologyToSchedulerAssignment(existingAssignments, topoToAliveExecutors); Set<String> missingAssignmentTopologies = new HashSet<>();for (TopologyDetails topo : topologies.getTopologies()) {
String id = topo.getId();
Set<List<Integer>> allExecs = topoToExec.get(id);
Set<List<Integer>> aliveExecs = topoToAliveExecutors.get(id);
int numDesiredWorkers = topo.getNumWorkers();
int numAssignedWorkers = numUsedWorkers(topoToSchedAssignment.get(id));
if(allExecs == null || allExecs.isEmpty() || ! allExecs.equals(aliveExecs) || numDesiredWorkers > numAssignedWorkers) { //We have something to schedule... missingAssignmentTopologies.add(id); } } Map<String, SupervisorDetails> supervisors =readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies);
Cluster cluster = new Cluster(inimbus, resourceMetrics, supervisors, topoToSchedAssignment, topologies, conf);
cluster.setStatusMap(idToSchedStatus.get());
schedulingStartTimeNs.set(Time.nanoTime());
scheduler.schedule(topologies, cluster);
//Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge
//......
return cluster.getAssignments();
}
Copy the code
- ComputeTopologyToExecutors returns a Map < String, Set the < List > > topoToExec, key for topologyId, the value to Set the < List >, the Integer executorId
- ComputeTopologyToAliveExecutors returns a Map < String, Set the < List > > topoToAliveExecutors, key for topologyId, the value to Set the < List >, Integer to executorId
- ComputeSupervisorToDeadPorts returns a Map < String, Set > supervisorToDeadPorts, key for supervisorId, the value is Set, longs for the port port
- ComputeTopologyToSchedulerAssignment returns a Map < String, SchedulerAssignmentImpl > topoToSchedAssignment, key for topologyId, The value for SchedulerAssignmentImpl
- After according to the topology configuration requirements of woker quantity and executor number compared with the existing the assignment, compute missingAssignmentTopologies
- ReadAllSupervisorDetails Returns Map
,>
supervisors, which are supervisors that are alive and ready to be distributed
- Finally, a Cluster is created here, and scheduler.schedule(Topologies, clusters) is called; schedule
DefaultScheduler.schedule
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/DefaultScheduler Java
public void schedule(Topologies topologies, Cluster cluster) {
defaultSchedule(topologies, cluster);
}
public static void defaultSchedule(Topologies topologies, Cluster cluster) {
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned =
EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
for (List<ExecutorDetails> list : aliveAssigned.values()) {
aliveExecutors.addAll(list);
}
Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet());
int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size());
Set<WorkerSlot> badSlots = null;
if(totalSlotsToUse > aliveAssigned.size() || ! allExecutors.equals(aliveExecutors)) { badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); }if (badSlots != null) {
cluster.freeSlots(badSlots);
}
EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
}
}
Copy the code
- Through cluster. NeedsSchedulingTopologies () traversal need scheduling TopologyDetails for processing
- Through cluster. GetAvailableSlots () to obtain the available slots
- Through EvenScheduler. Get aliveAssigned getAliveAssignedWorkerSlotExecutors, according to get canReassignSlots slotsCanReassign aliveAssigned calls
- Math.min(topology.getNumworkers (), canreassignslots.size () + availableslots.size ())) calculate totalSlotsToUse and release badSlots
- At last, by EvenScheduler. ScheduleTopologiesEvenly (new Topologies (topology), cluster)
EvenScheduler.scheduleTopologiesEvenly
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/EvenScheduler Java
public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
String topologyId = topology.getId();
Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
for(Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List<ExecutorDetails> executors = entry.getValue(); cluster.assign(nodePort, topologyId, executors); }}}Copy the code
- Run the scheduleTopology(Cluster) command to compute the newAssignment and convert it to Map
,>
nodePortToExecutors
- Assign data by cluster.assign(nodePort, topologyId, executors)
EvenScheduler.scheduleTopology
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/EvenScheduler Java
private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());
List<WorkerSlot> sortedList = sortSlots(availableSlots);
if (sortedList == null) {
LOG.error("No available slots for topology: {}", topology.getName());
return new HashMap<ExecutorDetails, WorkerSlot>();
}
//allow requesting slots number bigger than available slots
int toIndex = (totalSlotsToUse - aliveAssigned.size())
> sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);
Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
for (List<ExecutorDetails> list : aliveAssigned.values()) {
aliveExecutors.addAll(list);
}
Set<ExecutorDetails> reassignExecutors = Sets.difference(allExecutors, aliveExecutors);
Map<ExecutorDetails, WorkerSlot> reassignment = new HashMap<ExecutorDetails, WorkerSlot>();
if (reassignSlots.size() == 0) {
return reassignment;
}
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors);
Collections.sort(executors, new Comparator<ExecutorDetails>() {
@Override
public int compare(ExecutorDetails o1, ExecutorDetails o2) {
returno1.getStartTask() - o2.getStartTask(); }});for (int i = 0; i < executors.size(); i++) {
reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size()));
}
if(reassignment.size() ! = 0) { LOG.info("Available slots: {}", availableSlots.toString());
}
return reassignment;
}
Copy the code
- Calculate List reassignSlots from availableSlots
- Set. Difference (Allexec* *, Alias Executors) to calculate reassignexec* * and sort by startTask
- Reassignslots.get (I % reassignslots.size ()) to allocate slots by going through reassigNexec*
ExecutorDetails
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/ExecutorDetails Java
public class ExecutorDetails { public final int startTask; public final int endTask; / /... @Override public boolean equals(Object other) {if(! (other instanceof ExecutorDetails)) {return false;
}
ExecutorDetails executor = (ExecutorDetails) other;
return(this.startTask == executor.startTask) && (this.endTask == executor.endTask); }}Copy the code
- ExecutorDetails, although named Executor, encapsulates startTask and endTask information
- Schedule assigns tasks labeled staskTask and endTask to slot for processing
- Note that equals is overridden to compare startTask and endTask
Cluster.assign
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/Cluster. Java
/**
* Assign the slot to the executors for this topology.
*
* @throws RuntimeException if the specified slot is already occupied.
*/
public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
assertValidTopologyForModification(topologyId);
if (isSlotOccupied(slot)) {
throw new RuntimeException(
"slot: [" + slot.getNodeId() + "," + slot.getPort() + "] is already occupied.");
}
TopologyDetails td = topologies.getById(topologyId);
if (td == null) {
throw new IllegalArgumentException(
"Trying to schedule for topo "
+ topologyId
+ " but that is not a known topology "
+ topologies.getAllIds());
}
WorkerResources resources = calculateWorkerResources(td, executors);
SchedulerAssignmentImpl assignment = assignments.get(topologyId);
if (assignment == null) {
assignment = new SchedulerAssignmentImpl(topologyId);
assignments.put(topologyId, assignment);
} else {
for (ExecutorDetails executor : executors) {
if (assignment.isExecutorAssigned(executor)) {
throw new RuntimeException(
"Attempting to assign executor: "
+ executor
+ " of topology: "
+ topologyId
+ " to workerslot: "
+ slot
+ ". The executor is already assigned to workerslot: "
+ assignment.getExecutorToSlot().get(executor)
+ ". The executor must unassigned before it can be assigned to another slot!");
}
}
}
assignment.assign(slot, executors, resources);
String nodeId = slot.getNodeId();
double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
totalResourcesPerNodeCache.remove(slot.getNodeId());
}
Copy the code
- Assignment. Assign (slot, Executors, resources)
SchedulerAssignmentImpl.assign
Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/SchedulerAssignmentImpl Java
/** * Assign the slot to executors. */ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) { assert slot ! = null;for (ExecutorDetails executor : executors) {
this.executorToSlot.put(executor, slot);
}
slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
.addAll(executors);
if(slotResources ! = null) { resources.put(slot, slotResources); }else{ resources.remove(slot); }}Copy the code
- Give executorDetail (
StartTask and endTask
Slot) distribution
summary
- Nimbus.mkAssignments perform assignments for newSchedulerAssignments(
Map<String, SchedulerAssignmentImpl>, and key is topologyId
), during scheduler.schedule(Topologies, Cluster); schedule - EvenScheduler scheduleTopology method is the core of the dispatching operation, mainly ExecutorDetails distribution on the need to allocate slots, EvenScheduler USES is robbin round strategy, For each reassignExecutor, reassignslots.get (I % reassignslots.size ()) is used to assign slots
- ExecutorDetail has two properties, startTask and endTask, and the process of assigning slots to these tasks
- After assignments, lockingMkAssignments have two operations: state.setAssignment(topoId, Assignment, TD.getConf ()). Another is call notifySupervisorsAssignments method, notice to the supervisor, the supervisor after receiving the update of local memory
doc
- Understanding the Parallelism of a Storm Topology