A list,
The following figure shows the operation flow chart of Strom. When developing the Storm flow handler, we need to implement spout(data source) and Bolt (processing unit) with built-in or custom implementation, and associate them with TopologyBuilder to form the Topology.
IComponent interface
The IComponent interface defines the common methods for all components in the Topology (SPout/Bolt). The custom SPout or Bolt must implement this interface directly or indirectly.
public interface IComponent extends Serializable {
/** * Declare the output mode for all streams in this topology. *@paramDeclarer This is used to declare the output stream ID, output field, and whether each output stream is a direct stream */
void declareOutputFields(OutputFieldsDeclarer declarer);
/** * Declare the configuration of this component. * * /
Map<String, Object> getComponentConfiguration(a);
}
Copy the code
Third, Spout
3.1 ISpout interface
A custom SPout needs to implement the ISpout interface, which defines all available spout methods:
public interface ISpout extends Serializable {
/** * is called when the component is initialized@paramConf ISpout configuration *@paramContext An application context from which you can obtain task ids, component ids, and input and output information. *@paramThe collector is used to send tuples in spout. It is thread-safe and is recommended to save the instance variable */ for this SPout object
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
/** * ISpout is called when it is about to be closed. It does not have to be executed, however, and cannot be executed if the process is killed by kill -9 in a clustered environment. * /
void close(a);
/** * is called when ISpout is activated from the disabled state */
void activate(a);
/** * is called when ISpout is disabled */
void deactivate(a);
/** * This is a core method that sends tuples to the next receiver primarily by calling collector in this method, which must be non-blocking. NextTuple /ack/fail/ is executed in the same thread, so don't worry about thread-safety. Let * nextTuple sleep when no tuples are emitted to avoid wasting CPU. * /
void nextTuple(a);
/** * Confirm tuples with msgId. Tuples will not be sent again */
void ack(Object msgId);
/** * Confirm that tuples processing failed with the msgId. The confirmed tuples will be sent again for processing */
void fail(Object msgId);
}
Copy the code
3.2 BaseRichSpout abstract class
Typically, we implement our custom Spout not directly implementing the ISpout interface, but inheriting BaseRichSpout. BaseRichSpout inherits from BaseCompont and implements the IRichSpout interface.
The IRichSpout interface inherits from ISpout and IComponent and does not define any methods of its own:
public interface IRichSpout extends ISpout.IComponent {}Copy the code
Empty BaseComponent abstract class implements the IComponent getComponentConfiguration methods:
public abstract class BaseComponent implements IComponent {
@Override
public Map<String, Object> getComponentConfiguration(a) {
return null; }}Copy the code
BaseRichSpout inherits from the BaseCompont class and implements the IRichSpout interface, as well as some of its methods:
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
@Override
public void close(a) {}
@Override
public void activate(a) {}
@Override
public void deactivate(a) {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}}Copy the code
With this design, there are only three methods we must implement when implementing a custom SPout that inherits from BaseRichSpout:
- open: comes from ISpout, which can be used to get the object used to send tuples
SpoutOutputCollector
; - NextTuple: From ISpout, tuples must be sent inside this method;
- DeclareOutputFields: derived from IComponent, declares the name of the tuples sent so that the next component knows how to receive them.
Fourth, the Bolt
Bolt’s interface design is similar to spout’s:
4.1 IBolt interface
/** * The IBolt object created on the client computer. It is serialized to the topology (using Java serialization) and presented to the cluster's host (Nimbus). * Nimbus starts the Workers deserialization object, calls Prepare, and begins processing tuples. * /
public interface IBolt extends Serializable {
/** * is called when the component is initialized@paramConfiguration of this Bolt as defined in conf Storm *@paramContext An application context from which you can obtain task ids, component ids, and input and output information. *@paramThe collector is used to send tuples in spout. It is thread-safe and is recommended to save the instance variable */ for this SPout object
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/** * processes a single tuple input. * *@paramThe Tuple object contains metadata about it (such as from which component/stream/task) */
void execute(Tuple input);
/** * IBolt is called when it is about to be closed. It does not have to be executed, however, and cannot be executed if the process is killed by kill -9 in a clustered environment. * /
void cleanup(a);
Copy the code
4.2 BaseRichBolt Abstract class
Similarly, when implementing a custom Bolt, you usually inherit from the BaseRichBolt abstract class. BaseRichBolt inherits from the BaseComponent abstract class and implements the IRichBolt interface.
The IRichBolt interface inherits from IBolt and IComponent and does not define any methods of its own:
public interface IRichBolt extends IBolt, IComponent {
}
Copy the code
With this design, there are only three necessary methods to implement when implementing a custom Bolt from BaseRichBolt:
- prepare: comes from IBolt and can be used to get the object used to send tuples
OutputCollector
; - Execute: comes from IBolt, processes tuples and sends tuples after processing;
- DeclareOutputFields: derived from IComponent, declares the name of the tuples sent so that the next component knows how to receive them.
Fifth, word frequency statistics case
5.1 Case Introduction
Here we use our custom DataSourceSpout to generate word frequency data, and then use our custom SplitBolt and CountBolt for word frequency statistics.
Storm -word-count
5.2 Code Implementation
1. Project dependencies
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>
Copy the code
2. DataSourceSpout
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark"."Hadoop"."HBase"."Storm"."Flink"."Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple(a) {
// The simulation generates data
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/** ** analog data */
private String productData(a) {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t".0, endIndex); }}Copy the code
The above class uses the productData method to generate simulated data in the following format:
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
Copy the code
3. SplitBolt
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(newValues(word)); }}@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word")); }}Copy the code
4. CountBolt
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
/ / output
System.out.print("Current real-time statistics :");
counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
System.out.println();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}}Copy the code
5. LocalWordCountApp
The components defined above are connected in series through the Topology builder to form the Topology, which is presented to the LocalCluster to run. Typically during development, you can test in local mode first, and then commit to a server cluster for running after the test is complete.
public class LocalWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout".new DataSourceSpout());
// Send the DataSourceSpout data to SplitBolt for processing
builder.setBolt("SplitBolt".new SplitBolt()).shuffleGrouping("DataSourceSpout");
// to send SplitBolt data to CountBolt for processing
builder.setBolt("CountBolt".new CountBolt()).shuffleGrouping("SplitBolt");
// Create a local cluster to test this mode. You don't need to install Storm locally, just run the Main method
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountApp".newConfig(), builder.createTopology()); }}Copy the code
6. Running result
Start the main method of WordCountApp, Storm will automatically set up a cluster in the local mode, so the startup process will be a little slower, you can see the log output after successful startup.
Commit to a server cluster for execution
6.1 Code Changes
The code submitted to the server is slightly different from the native code in that the StormSubmitter is used for submission to the server cluster. The main code is as follows:
For clarity, we’ll create a new ClusterWordCountApp class to demonstrate the cluster-mode commit. In practice, you can write the code for both schemas in the same class and decide which schema to start by passing in external parameters.
public class ClusterWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout".new DataSourceSpout());
// Send the DataSourceSpout data to SplitBolt for processing
builder.setBolt("SplitBolt".new SplitBolt()).shuffleGrouping("DataSourceSpout");
// to send SplitBolt data to CountBolt for processing
builder.setBolt("CountBolt".new CountBolt()).shuffleGrouping("SplitBolt");
// Use StormSubmitter to submit the Topology to the server cluster
try {
StormSubmitter.submitTopology("ClusterWordCountApp".new Config(), builder.createTopology());
} catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}}Copy the code
6.2 Packing and Uploading
Package it and upload it to any location on the server. In this case, THE packaged name is storm-word-count-1.0.jar
# mvn clean package -Dmaven.test.skip=true
Copy the code
6.3 submit the Topology
Submit the Topology to the cluster using the following command:
#Command format: storm jar jar package location Full path of main class... Optional argumentsStorm jar/usr/appjar/storm - word count - 1.0. Jar com. Heibaiying. Wordcount. ClusterWordCountAppCopy the code
If the message “successfully” is displayed, the submission is successful:
6.4 Checking Topology and Stopping Topology (CLI)
#View all Topology
storm list
#To stop the stormkill topology-name [-w wait-time-secs]
storm kill ClusterWordCountApp -w 3
Copy the code
6.5 Viewing Topology or Stopping Topology (Interface mode)
You can also stop the Topology operation using the UI. On the WEB UI (port 8080), click Topology Summary to go to the Topology details screen.
Vii. Extended instructions on project packaging
Limitations of the MVN Package
In the above steps, we packaged the project directly using the MVN package without configuring any plug-ins in the POM, which is feasible for projects that do not use external dependency packages. However, if a third-party JAR package is used in the project, there will be a problem, because the JAR packaged by package does not contain the dependency package, and if you submit it to the server to run, there will be an exception that the third-party dependency cannot be found.
At this point, you might wonder if we didn’t use the storm-core dependency in our project. This JAR package is provided in the Storm cluster environment in the lib directory of the installation directory:
To illustrate this, I introduced a third-party JAR package in Maven and modified the method of generating the data:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
Copy the code
Stringutils.join () is available in both commons.lang3 and storm-core. Nothing needs to be changed from the original code except to specify the use of commons.lang3 when importing.
import org.apache.commons.lang3.StringUtils;
private String productData(a) {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(), "\t".0, endIndex);
}
Copy the code
Run the MVN Clean Package directly, and the exception shown below will be thrown. Therefore, this direct packaging approach is not suitable for real development, where third-party JAR packages are often required.
Maven provides two plugins, maven-assembly-plugin and Maven-shade-plugin, to add dependency packages to the final JAR. Since this article is quite long and there is much to be explained about Storm packaging, the packaging method of Storm will be sorted out separately in the next article:
Storm: Comparison and analysis of three packaging methods
The resources
- Running Topologies on a Production Cluster
- Pre-defined Descriptor Files