This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.

Scheduler

Overview: The Scheduler is the Scheduler in Storm that assigns resources available in the current cluster to the topology. – Storm defines the IScheduler interface, which allows users to define their schedulers. Storm has four built-in schedulers: DefaultScheduler, IsolationScheduler, MultitenantScheduler, and ResourceAwareScheduler.

The Topology submission process:

  1. In non-local mode, the client invokes the Nimbus interface through thrift to upload code to Nimbus and trigger the commit operation.
  2. Nimbus assigns tasks and synchronizes information to ZooKeeper.
  3. The Supervisor periodically obtains task assignment information. If the topology code is missing, the Supervisor downloads the code from Nimbus and synchronizes the worker based on the task assignment information.
  4. The worker starts multiple Executor threads and instantiates spout, Bolt, acker and other components based on the assigned tasks. At this point, the Storm cluster starts working after all connections (the network connection between workers and other machines) are started.
  5. Components such as SPout and Bolt will run until the kill Topology is called.

DefaultScheduler(Storm default scheduler) :

Main source code analysis:

public static void defaultSchedule(Topologies topologies, Cluster cluster) { for (TopologyDetails topology : Cluster. NeedsSchedulingTopologies ()) {/ / call the cluster object needsSchedulingTopologys method to obtain the set of task scheduling Topology is required. // Call the cluster's getAvailableSlots method to get the availableSlots List<WorkerSlot> availableSlots = cluster.getAvailableSlots(); Set<ExecutorDetails> allExecutors = topology.getExecutors(); / / call getAliveAssignedWorkerSlotExecutors method for current topology has allocated resources situation Map < WorkerSlot, List<ExecutorDetails>> aliveAssigned = EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); ExecutorDetails> ExecutorDetails = new // Save the topology to aliveExecutors Set Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>(); for (List<ExecutorDetails> list : aliveAssigned.values()) { aliveExecutors.addAll(list); } // Call slotsCanReassign to check aliveAssigned slot information. Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet()); // Calculates the number of slots available to the current Topology, The minimum value that the topology set between the number of workers and the current availableSlots plus the canReassignSlots data (totalSlotsToUse) int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size()); // Check whether the number of totalSlotsToUse is greater than aliveAssigned. If so, call badSlots to calculate all available slots Set<WorkerSlot> badSlots = null. if (totalSlotsToUse > aliveAssigned.size() || ! allExecutors.equals(aliveExecutors)) { badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse); } // Call the cluster's freeSlots method to release badSlots if (badSlots! = null) { cluster.freeSlots(badSlots); } / / call EventScheduler scheduleTopologiesEvenly method system of resources evenly distribute the Topology EvenScheduler. ScheduleTopologiesEvenly (new Topologies(topology), cluster); }}Copy the code

Pluggable scheduler: Pluggable scheduler

You can implement your own scheduler instead of the default scheduler to allocate executors to the worker. To use it, set the “storm. Scheduler “configuration property to your class in the storm.yaml file and your scheduler must implement the IScheduler interface.

Isolation Scheduler:

Isolation Scheduler makes cluster resources shared by multiple Topologies easier and more secure. Isolation Scheduler allows you to designate certain Topologies as “isolated”, which means that these isolated Topologies are running on a particular machine in the cluster on which no other Topologies are running. Isolated Topologies have high priority, so resources will be allocated to isolated Topologies if they compete with non-isolated Topologies. Even if resources must be allocated to isolated Topologies, Resources will be extracted from those non-isolated Topologies. Once all the resources required by the quarantined Topologies are met, the remaining machines in the cluster are shared by the non-quarantined Topologies.

You can through the storm. The scheduler is set to the org. Apache. Storm. The scheduler. IsolationScheduler, such a Nimbus node scheduler configuration isolation scheduler. Then, using the isolation. The scheduler. Those configuration to specify how much each topology distribution machine. The configuration is a map set, where K is the topology name and V is the number of quarantined machines assigned to the topology. The following is an example:

storm.scheduler: "org.apache.storm.scheduler.IsolationScheduler"
isolation.scheduler.machines: 
    "my-topology": 8
    "tiny-topology": 1
    "some-other-topology": 3
Copy the code

All Topologies submitted to the cluster that do not appear in the above map collection will not be isolated. Note: The Isolation property cannot be set for user, and this configuration can only be assigned by the administrator of the cluster (this is intentional).

The Isolation scheduler solves the multi-tenant problem by providing complete isolation between topologies, avoiding resource competition between topologies. The ultimate goal is that topologies that are “in production” should be set up to be isolated, whereas topologies in testing or development are the opposite. The remaining machines on the cluster take on the dual role of failover for the quarantined topology and running the non-quarantined topology.

MultitenantScheduler:

This scheduling pattern constructs its own isolated resource pool for each Topology publisher, and then allocates nodes by iterating through the Topology set and assigning topology associations to the resource pool.

ResourceAwareScheduler:

The resource-aware scheduler can allocate resources on a per-user basis. Each user can guarantee a certain number of resources to run his topology, and the resource-aware scheduler will try to satisfy these guarantees. When Storm clusters have additional free resources, the resource-aware scheduler will be able to allocate additional resources to users in a fair manner.

Configuration

Storm has a number of configuration items to adjust nimbus, Supervisors, and Topologys behavior. Some configuration items are system configuration items that cannot be changed in the topology, while other configuration items can be changed in each topology.

Each configuration item has a default value in the defaults.yaml (①) file in the Storm codebase. You can override the default by defining a storm. Yaml (②) in the Nimbus and Supervisors environment variables. Finally, you can also define topology-based configuration items when submitting your topology using StormSubmitter (③). However, topology-based configuration items can override only those configuration items whose prefix is TOPOLOGY.

Storm 0.7.0 and above allows you to overwrite every Bolt/SPout configuration. In this way you can only modify the following configuration items:

  1. “topology.debug”
  2. “topology.max.spout.pending”
  3. “topology.max.task.parallelism”
  4. “Topology.kryo. register”: This configuration is slightly different from other configuration items because Serialization is visible to all components in the topology. For details, see Serialization.

Storm’s Java API supports two ways to customize component configuration information:

  1. Cote Dieu (Built-in)(④) : Rewrite in SPout/BoltgetComponentConfigurationAnd returns a component-specific configuration map.
  2. In the past two decades, the farms of the world have changed.(5) :TopologyBuilderIn thesetSpout 与 setBoltThe method returns a string withaddConfigurationmethodsComponentConfigurationDeclarerObject, throughaddConfigurationMethod to override the configuration of the corresponding component.

Yaml < storm. Yaml < Topology Specific Configuration < internal Component Specific Configuration < External Component Specific Configuration. ① < ② < ③ < ④ < ⑤

Bolts, Spouts, and Plugins

In most cases, configuration of bolts or spouts should be done through setters on bolts or spouts, not through topology configuration. In rare cases, this may expose configuration of a topology that is not currently part of Config or DaemonConfig, such as when writing a custom scheduler or plug-in to storm. In these cases, you can create your own classes to implement the Validated interface, such as Config. Any public Static Final String field declared in this class is treated as a configuration item, Org. Apache. Storm. The validation. ConfigValidationAnnotations category annotations can be used to specify the configuration items should be stored in the config. To let the Validator know about this class, you need to treat it as a service that will be loaded through the ServiceLoader for the Validated class, In the jar contains a meta-inf/services/org. Apache. Storm. The validation. The Validated file to save you the name of the configuration class.

Resources:

  • Config: client configuration list, which is useful for creating configurations for a specific topology.
  • DaemonConfig: Storm process configuration list.
  • Yaml: defaults for all configuration items.
  • Setting up a Storm cluster: Setting up a Storm cluster
  • Running Topologies on a production Cluster: lists some useful configuration items for Running a topology in a cluster.
  • Local mode: lists useful configuration items when using the Local mode.

The blog only for beginners self learning record, shallow words, if there is wrong, kindly correct. If you don’t understand, welcome to comment, learn together!

The resources

Storm Document -> Scheduler Storm Document -> Configuration