sequence

This article focuses on mkAssignments for Storm Nimbus

Nimbus.mkAssignments

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java

    void doRebalance(String topoId, StormBase stormBase) throws Exception {
        RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options();
        StormBase updated = new StormBase();
        updated.set_topology_action_options(null);
        updated.set_component_debug(Collections.emptyMap());

        if (rbo.is_set_num_executors()) {
            updated.set_component_executors(rbo.get_num_executors());
        }

        if (rbo.is_set_num_workers()) {
            updated.set_num_workers(rbo.get_num_workers());
        }
        stormClusterState.updateStorm(topoId, updated);
        updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal()));
        idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it recompute.
        mkAssignments(topoId);
    }

    private void mkAssignments() throws Exception {
        mkAssignments(null);
    }
Copy the code
  • Here call stormClusterState. UpdateStorm (topoId, updated) update zk data
  • Here doRebalance and mkAssignments call mkAssignments(String scratchTopoId)

mkAssignments(String scratchTopoId)

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java

    private void mkAssignments(String scratchTopoId) throws Exception {
        try {
            if(! isReadyForMKAssignments()) {return;
            }
            // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {}
            // filter out ones which have a executor timeout
            // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors
            // should be in each slot (e.g., 4, 4, 4, 5)
            // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
            // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be
            // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around IStormClusterState state = stormClusterState; //read all the topologies Map
      
        bases; Map
       
         tds = new HashMap<>(); synchronized (submitLock) { // should promote: only fetch storm bases of topologies that need scheduling. bases = state.topologyBases(); for (Iterator
        
         > it = bases.entrySet().iterator(); it.hasNext(); ) { Entry
         
           entry = it.next(); String id = entry.getKey(); try { tds.put(id, readTopologyDetails(id, entry.getValue())); } catch (KeyNotFoundException e) { //A race happened and it is probably not running it.remove(); } } } List
          
            assignedTopologyIds = state.assignments(null); Map
           
             existingAssignments = new HashMap<>(); for (String id : assignedTopologyIds) { //for the topology which wants rebalance (specified by the scratchTopoId) // we exclude its assignment, meaning that all the slots occupied by its assignment // will be treated as free slot in the scheduler code. if (! id.equals(scratchTopoId)) { Assignment currentAssignment = state.assignmentInfo(id, null); if (! currentAssignment.is_set_owner()) { TopologyDetails td = tds.get(id); if (td ! = null) { currentAssignment.set_owner(td.getTopologySubmitter()); state.setAssignment(id, currentAssignment, td.getConf()); } } existingAssignments.put(id, currentAssignment); } } // make the new assignments for topologies lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds); } catch (Exception e) { this.mkAssignmentsErrors.mark(); throw e; }}
           ,>
          
         ,>
        
       ,>
      ,>Copy the code
  • Read the TopologyDetails information by readTopologyDetails
  • Assignments (Runnable Callback) and assignments. AssignmentInfo (String stormId, Runnable callback) take existing assignments
  • Then call lockingMkAssignments to assign new Assignments to topologies

lockingMkAssignments

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java

    private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases,
                                      String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state,
                                      Map<String, TopologyDetails> tds) throws Exception {
        Topologies topologies = new Topologies(tds);

        synchronized (schedLock) { Map<String, SchedulerAssignment> newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);  Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state); //construct the final Assignments by adding start-times etc into it Map<String, Assignment> newAssignments = new HashMap<>(); / /... //tasks figure out what tasks to talk to by looking at topology at runtime // onlylog/set when there's been a change to the assignment for (Entry
      
        entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); TopologyDetails td = topologies.getById(topoId); if (assignment.equals(existingAssignment)) { LOG.debug("Assignment for {} hasn'
      ,>t changed", topoId); } else { LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment); state.setAssignment(topoId, assignment, td.getConf()); } } //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment //because the number of existing assignments is small for every scheduling round, //we expect to notify supervisors at almost the same time Map
      
        totalAssignmentsChangedNodes = new HashMap<>(); for (Entry
       
         entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment)); } notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes, basicSupervisorDetailsMap); Map
        
         > addedSlots = new HashMap<>(); for (Entry
         
           entry : newAssignments.entrySet()) { String topoId = entry.getKey(); Assignment assignment = entry.getValue(); Assignment existingAssignment = existingAssignments.get(topoId); if (existingAssignment == null) { existingAssignment = new Assignment(); existingAssignment.set_executor_node_port(new HashMap<>()); existingAssignment.set_executor_start_time_secs(new HashMap<>()); } Set
          
            newSlots = newlyAddedSlots(existingAssignment, assignment); addedSlots.put(topoId, newSlots); } inimbus.assignSlots(topologies, addedSlots); }}
          
         ,>
        ,>
       ,>
      ,>Copy the code
  • This first call computeNewSchedulerAssignments newSchedulerAssignments method calculation
  • NewAssignments is traversed, and if assignment changes, state.setAssignment(topoId, Assignment, td.getconf ()) is called to write assignment information to ZK
  • And then calculate totalAssignmentsChangedNodes call notifySupervisorsAssignments, with the aid of AssignmentDistributionService notification to the supervisors

computeNewSchedulerAssignments

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java

private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments, Topologies topologies, Map<String, StormBase> bases, String scratchTopologyId) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException { Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases); Set<String> zkHeartbeatTopologies = topologies.getTopologies().stream() .filter(topo -> ! supportRpcHeartbeat(topo)) .map(TopologyDetails::getId) .collect(Collectors.toSet()); updateAllHeartbeats(existingAssignments, topoToExec, zkHeartbeatTopologies); Map<String, Set<List<Integer>>> topoToAliveExecutors = computeTopologyToAliveExecutors(existingAssignments, topologies, topoToExec, scratchTopologyId); Map<String, Set<Long>> supervisorToDeadPorts = computeSupervisorToDeadPorts(existingAssignments, topoToExec, topoToAliveExecutors); Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = computeTopologyToSchedulerAssignment(existingAssignments, topoToAliveExecutors); Set<String> missingAssignmentTopologies = new HashSet<>();for (TopologyDetails topo : topologies.getTopologies()) {
            String id = topo.getId();
            Set<List<Integer>> allExecs = topoToExec.get(id);
            Set<List<Integer>> aliveExecs = topoToAliveExecutors.get(id);
            int numDesiredWorkers = topo.getNumWorkers();
            int numAssignedWorkers = numUsedWorkers(topoToSchedAssignment.get(id));
            if(allExecs == null || allExecs.isEmpty() || ! allExecs.equals(aliveExecs) || numDesiredWorkers > numAssignedWorkers) { //We have something to schedule... missingAssignmentTopologies.add(id); } } Map<String, SupervisorDetails> supervisors =readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies);
        Cluster cluster = new Cluster(inimbus, resourceMetrics, supervisors, topoToSchedAssignment, topologies, conf);
        cluster.setStatusMap(idToSchedStatus.get());

        schedulingStartTimeNs.set(Time.nanoTime());
        scheduler.schedule(topologies, cluster);
        //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge
        //......

        return cluster.getAssignments();
    }
Copy the code
  • ComputeTopologyToExecutors returns a Map < String, Set the < List > > topoToExec, key for topologyId, the value to Set the < List >, the Integer executorId
  • ComputeTopologyToAliveExecutors returns a Map < String, Set the < List > > topoToAliveExecutors, key for topologyId, the value to Set the < List >, Integer to executorId
  • ComputeSupervisorToDeadPorts returns a Map < String, Set > supervisorToDeadPorts, key for supervisorId, the value is Set, longs for the port port
  • ComputeTopologyToSchedulerAssignment returns a Map < String, SchedulerAssignmentImpl > topoToSchedAssignment, key for topologyId, The value for SchedulerAssignmentImpl
  • After according to the topology configuration requirements of woker quantity and executor number compared with the existing the assignment, compute missingAssignmentTopologies
  • ReadAllSupervisorDetails Returns Map

    supervisors, which are supervisors that are alive and ready to be distributed
    ,>
  • Finally, a Cluster is created here, and scheduler.schedule(Topologies, clusters) is called; schedule

DefaultScheduler.schedule

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/DefaultScheduler Java

    public void schedule(Topologies topologies, Cluster cluster) {
        defaultSchedule(topologies, cluster);
    }

    public static void defaultSchedule(Topologies topologies, Cluster cluster) {
        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
            List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
            Set<ExecutorDetails> allExecutors = topology.getExecutors();

            Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned =
                EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
            Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
            for (List<ExecutorDetails> list : aliveAssigned.values()) {
                aliveExecutors.addAll(list);
            }

            Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet());
            int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size());

            Set<WorkerSlot> badSlots = null;
            if(totalSlotsToUse > aliveAssigned.size() || ! allExecutors.equals(aliveExecutors)) { badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); }if (badSlots != null) {
                cluster.freeSlots(badSlots);
            }

            EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
        }
    }
Copy the code
  • Through cluster. NeedsSchedulingTopologies () traversal need scheduling TopologyDetails for processing
  • Through cluster. GetAvailableSlots () to obtain the available slots
  • Through EvenScheduler. Get aliveAssigned getAliveAssignedWorkerSlotExecutors, according to get canReassignSlots slotsCanReassign aliveAssigned calls
  • Math.min(topology.getNumworkers (), canreassignslots.size () + availableslots.size ())) calculate totalSlotsToUse and release badSlots
  • At last, by EvenScheduler. ScheduleTopologiesEvenly (new Topologies (topology), cluster)

EvenScheduler.scheduleTopologiesEvenly

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/EvenScheduler Java

    public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
            String topologyId = topology.getId();
            Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
            Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);

            for(Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) { WorkerSlot nodePort = entry.getKey(); List<ExecutorDetails> executors = entry.getValue(); cluster.assign(nodePort, topologyId, executors); }}}Copy the code
  • Run the scheduleTopology(Cluster) command to compute the newAssignment and convert it to Map

    nodePortToExecutors
    ,>
  • Assign data by cluster.assign(nodePort, topologyId, executors)

EvenScheduler.scheduleTopology

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/EvenScheduler Java

    private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
        List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
        Set<ExecutorDetails> allExecutors = topology.getExecutors();
        Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
        int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());

        List<WorkerSlot> sortedList = sortSlots(availableSlots);
        if (sortedList == null) {
            LOG.error("No available slots for topology: {}", topology.getName());
            return new HashMap<ExecutorDetails, WorkerSlot>();
        }

        //allow requesting slots number bigger than available slots
        int toIndex = (totalSlotsToUse - aliveAssigned.size())
                      > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
        List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);

        Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
        for (List<ExecutorDetails> list : aliveAssigned.values()) {
            aliveExecutors.addAll(list);
        }
        Set<ExecutorDetails> reassignExecutors = Sets.difference(allExecutors, aliveExecutors);

        Map<ExecutorDetails, WorkerSlot> reassignment = new HashMap<ExecutorDetails, WorkerSlot>();
        if (reassignSlots.size() == 0) {
            return reassignment;
        }

        List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors);
        Collections.sort(executors, new Comparator<ExecutorDetails>() {
            @Override
            public int compare(ExecutorDetails o1, ExecutorDetails o2) {
                returno1.getStartTask() - o2.getStartTask(); }});for (int i = 0; i < executors.size(); i++) {
            reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size()));
        }

        if(reassignment.size() ! = 0) { LOG.info("Available slots: {}", availableSlots.toString());
        }
        return reassignment;
    }
Copy the code
  • Calculate List reassignSlots from availableSlots
  • Set. Difference (Allexec* *, Alias Executors) to calculate reassignexec* * and sort by startTask
  • Reassignslots.get (I % reassignslots.size ()) to allocate slots by going through reassigNexec*

ExecutorDetails

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/ExecutorDetails Java

public class ExecutorDetails { public final int startTask; public final int endTask; / /... @Override public boolean equals(Object other) {if(! (other instanceof ExecutorDetails)) {return false;
        }

        ExecutorDetails executor = (ExecutorDetails) other;
        return(this.startTask == executor.startTask) && (this.endTask == executor.endTask); }}Copy the code
  • ExecutorDetails, although named Executor, encapsulates startTask and endTask information
  • Schedule assigns tasks labeled staskTask and endTask to slot for processing
  • Note that equals is overridden to compare startTask and endTask

Cluster.assign

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/Cluster. Java

    /**
     * Assign the slot to the executors for this topology.
     *
     * @throws RuntimeException if the specified slot is already occupied.
     */
    public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
        assertValidTopologyForModification(topologyId);
        if (isSlotOccupied(slot)) {
            throw new RuntimeException(
                "slot: [" + slot.getNodeId() + "," + slot.getPort() + "] is already occupied.");
        }

        TopologyDetails td = topologies.getById(topologyId);
        if (td == null) {
            throw new IllegalArgumentException(
                "Trying to schedule for topo "
                + topologyId
                + " but that is not a known topology "
                + topologies.getAllIds());
        }
        WorkerResources resources = calculateWorkerResources(td, executors);
        SchedulerAssignmentImpl assignment = assignments.get(topologyId);
        if (assignment == null) {
            assignment = new SchedulerAssignmentImpl(topologyId);
            assignments.put(topologyId, assignment);
        } else {
            for (ExecutorDetails executor : executors) {
                if (assignment.isExecutorAssigned(executor)) {
                    throw new RuntimeException(
                        "Attempting to assign executor: "
                        + executor
                        + " of topology: "
                        + topologyId
                        + " to workerslot: "
                        + slot
                        + ". The executor is already assigned to workerslot: "
                        + assignment.getExecutorToSlot().get(executor)
                        + ". The executor must unassigned before it can be assigned to another slot!");
                }
            }
        }

        assignment.assign(slot, executors, resources);
        String nodeId = slot.getNodeId();
        double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
        assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
        updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
        totalResourcesPerNodeCache.remove(slot.getNodeId());
    }
Copy the code
  • Assignment. Assign (slot, Executors, resources)

SchedulerAssignmentImpl.assign

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/scheduler/SchedulerAssignmentImpl Java

/** * Assign the slot to executors. */ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) { assert slot ! = null;for (ExecutorDetails executor : executors) {
            this.executorToSlot.put(executor, slot);
        }
        slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
            .addAll(executors);
        if(slotResources ! = null) { resources.put(slot, slotResources); }else{ resources.remove(slot); }}Copy the code
  • Give executorDetail (StartTask and endTaskSlot) distribution

summary

  • Nimbus.mkAssignments perform assignments for newSchedulerAssignments(Map<String, SchedulerAssignmentImpl>, and key is topologyId), during scheduler.schedule(Topologies, Cluster); schedule
  • EvenScheduler scheduleTopology method is the core of the dispatching operation, mainly ExecutorDetails distribution on the need to allocate slots, EvenScheduler USES is robbin round strategy, For each reassignExecutor, reassignslots.get (I % reassignslots.size ()) is used to assign slots
  • ExecutorDetail has two properties, startTask and endTask, and the process of assigning slots to these tasks
  • After assignments, lockingMkAssignments have two operations: state.setAssignment(topoId, Assignment, TD.getConf ()). Another is call notifySupervisorsAssignments method, notice to the supervisor, the supervisor after receiving the update of local memory

doc

  • Understanding the Parallelism of a Storm Topology