The five levels of Strom’s concurrency model

  • Cluster level: Cluster

  • Node level: Supervisor

  • Process level: worker\

    • conf.setNumWorkers(3); Set the topology to have three workers.
  • Thread level: Executor

    • In TopologyBuilder.setBolt(String ID, IRichBolt, Number Parallelism_hint), parallelism_hint determines how many executors to start.
    • @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
  • Object level: Task, the lowest level of object, is the individual object that the thread operates on.

    • SetNumTasks (3) sets the number of tasks, usually one task in a thread.

Worker, Executor, and Task relationships

Website example

Adjustment by shell

# Reconfigure the topology" mytopology" to use 5 worker processes # the spout "blue-spout" to use 3 executors and # the bolt "yellow-bolt" to use 10 executors. storm rebalance mytopology  -w 10 -n 5 -e blue-spout=3 -e yellow-bolt=10Copy the code

Increase parallelism

worker(slots)

  • By default, four worker processes can be started from a node with the supervisor. Slots.port parameter. This is already configured in the storm configuration file, by default in defaults.yaml in the strom-core.jar package.
  • By default, only one worker process is used for a strom project. You can set up multiple worker processes through code.
  • Run the config.setnumworkers (workers) command
  • Through the conf. SetNumAckers (0); You can cancel the acker task
  • It is better for one topology on one machine to use only one worker, mainly because data transfer between workers is reduced
  • If the worker is used up and the topology is submitted, it will not be executed and will be in a wait state

executor

  • By default, each executor runs a task by setting \ in the code

    • builder.setSpout(id, spout, parallelism_hint);
    • builder.setBolt(id, bolt, parallelism_hint);

task

  • Through boltDeclarer. SetNumTasks (num); To set the number of instances
  • The number of executors will be less than or equal to the number of tasks (for rebalance)

Data communication between workers


Workers communicate with each other through Netty

• 1. Executor executes spout.nexttuple ()/bolt.execute() and emits tuple to the Executor Transfer Queue

• 2. The Executor Transfer Thread adds tuples from its Transfer queue to the worker Transfer queue

• 3. The worker transfer thread serializes the tuple in the transfer queue and sends it to the remote worker

• 4. Worker receiver Threads receive data from the network, deserialize them into tuples and put them into the corresponding Executor receive queue

• execute(); execute();



Message communication within work

stateless

  • Stateless, \

    • Nimbus/Supervisor /worker status is stored in ZK, and a small amount of worker information is stored in the local file system. Moreover, nimbus JAR package is also stored in the local system. Its advantage is that it can be quickly recovered even after Nimbus fails. Recovery is to know how to start a new one and then go to ZK to read the corresponding data. Because of this, Storm is very stable and robust. In fact, ZK provides a good decoupling, and any module that fails can still work very well. The worker can also work as usual, because there is no direct relationship between the supervisor and the worker, and they are all transmitted through zooKeeper or local file in line state.

A single center

  • Storm is also a master/slave architecture. Master is Nimbus and slave is supervisor. The advantage of a single center is that the scheduling of the whole topology becomes very simple
  • This is not a big problem. When no new topology is sent, the topology running in the cluster is not affected. In extreme cases, one of the supervisor machines dies. There will be no rescheduling without Nimbus, which will still have some impact.
  • Another single point problem is the single point of performance, because the jar package of the topology is still distributed through Nimbus. The client submits to Nimbus first, and then the supervisor downloads the jar package from Nimbus. When the file is large, When there are more Supervisor machines, Nimbus is still a bottleneck, and the network cards are quickly filled. Storm has some solutions to solve this problem, such as making file distribution P2P.

Good isolation,

Because the executor and worker do the real work, Nimbus/Supervisor controls the topology process. The data process is done between the executor and worker. All data transfer is done between workers. You don’t need nimbus and Supervisor, and especially you don’t need Supervisor, which is very successful by design. In Hadoop, TaskTracker exists, and all shuffle goes through TaskTracker, so, When all TaskTracker processes are suspended, the map on them needs to be re-executed because no one is responsible for transferring data to and from workers. Unlike Storm, all data transfer takes place between workers, even when the Supervisor is suspended. When the Supervisor is down, it will not affect the worker at all. What’s more, the worker can still work when nimbus/ Supervisor is down, which is unimaginable in Hadoop.

Storm UI

12. Emitted by a source that is not activated (suspended) 12. Resolution of an extension tuple, as opposed to emitted by: Even though a task must be emitted by a emitted tuple into two tasks, the number of extensions must be twice that of complete tasks: Spout ack = spout ack = spout ack = spout ack = spout ack Execute Latency: Indicates the average time at which a bolt processes a tuple, excluding ack operations.Copy the code