Local mode
In local mode, the Storm topology runs on a single JVM process on the local machine. This mode is used for development, testing, and debugging because it is the easiest way to see how all the components work together. In this mode, we can adjust the parameters and see how our topology works in different Storm configurations. To run in local mode, we need to download the Storm development dependency to develop and test our topology. After we create the first Storm project, we will soon understand how to use native mode. NOTE: In local mode, it is much like running in a cluster environment. However, it is important to make sure that all components are thread-safe, because when deployed in remote mode they may run on different JVM processes or even different physical machines, with no direct communication or shared memory between them.
Remote mode
In remote mode, we submit a topology to the Storm cluster, which typically consists of many processes running on different machines. Remote mode has no debugging information, so it is also called production mode. However, it is a good idea to set up a Storm cluster on a single development machine to make sure that the topology works in a cluster environment before being deployed to production.
Commonly used Java API
1) Basic interface
(1) IComponent (2) ISpout (3) IRichSpout (4) IStateSpout (5) IRichStateSpout (6) IBolt (7) IRichBolt (8) IBasicBolt interfaceCopy the code
2) Basic abstract classes
BaseComponent abstract class (2) BaseRichSpout abstract class (3) BaseRichBolt abstract class (4) BaseTransactionalBolt abstract class (5) BaseBasicBolt abstract classCopy the code
Create Spouts
package com.qxw.spout; import java.util.HashMap; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * Data source spout * @author QXW * @data ** How to declare the data source: Inherit the BaseRichSpout class and override the required methods. */ Public class DataSource extends BaseRichSpout {private static Final Long serialVersionUID = 1L; private SpoutOutputCollector collector; private static final Map<Integer, String> map = new HashMap<Integer, String>(); static { map.put(0,"java");
map.put(1, "php");
map.put(2, "groovy");
map.put(3, "python");
map.put(4, "ruby"); } /** * public void open(Map conf, TopologyContext, SpoutOutputCollector collector) { this.collector = collector; } /** * Polls tuple to send data */ @override public voidnextTupleR = new Random(); r = new Random(); int num = r.nextInt(5); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // Send the word to the next topology node this.collector.emit(new Values(map.get(num))); } /** * specifies the name of the data to be sent */ @override public voiddeclareOutputFields(OutputFieldsDeclarer) {// Specify the name to be used when the next field value is declarer. Declare (new Fields(declarer)"data")); } /** * the spout is executed before it closes, but there is no guarantee that it will be executed */ @override public voidclose() {
System.out.println("Execute before SPout closes"); } /** * is called when Spout has been activated from invalidation mode. The nextTuple() method of the Spout will be called shortly. */ @Override public voidactivate() {
System.out.println("Called when Spout has been activated from failed mode"); } /** * is called when Spout has expired. NextTuple is not invoked during a Spout failure. Spout may or may not be reactivated in the future. */ @Override public voiddeactivate() {
System.out.println("Called when Spout has expired"); } /** * Override public void ack(Object paramObject) {system.out.println ("Tuple callback method handled successfully"); */ @override public void fail(Object paramObject) {system.out.println ("paramObject"); }}Copy the code
Data flow processing component
package com.qxw.bolt; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * Bolt to print output processing */ Public extends BaseBasicBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; /** * accept a tuple for processing, */ @override public void execute(Tuple input, BasicOutputCollector collector) {//// Get Field String value=input.getStringByField("data");
System.out.println("Data sent by data source:"+value); // Send to the next component collector.emit(new Values(value)); } /** * specifies the name of the data to be sent */ @override public voiddeclareOutputFields(OutputFieldsDeclarer) {// Can send multiple Fields declarer. Declare (new Fields)"outdata")); }}Copy the code
package com.qxw.bolt; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Bolt to print output processing Public class OutBolt2 extends BaseBasicBolt public class OutBolt2 extends BaseBasicBolt { private static final long serialVersionUID = 1L; /** * accept a tuple for processing, */ @override public void execute(Tuple input, BasicOutputCollector collector) {//// Get Field String value=input.getStringByField("outdata");
System.out.println("Receive values sent by the OutBolt database stream processing component:"+value); } /** * specifies the name of the data to be sent */ @override public voiddeclareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Copy the code
Construct the topology
package com.qxw.topology; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import com.qxw.bolt.OutBolt; import com.qxw.bolt.OutBolt2; import com.qxw.spout.DataSource; Public class TopologyTest {public static void main(String[] args) throws Exception {// Configure Config CFG = new Config(); cfg.setNumWorkers(2); Cfg.setdebug (// Specify the number of working processes (number of JVMS, available in distributed environment, local mode setting meaningless)true); TopologyBuilder builder = new TopologyBuilder(); // Set the data source builder.setspout ("dataSource", new DataSource()); // Set builder.setbolt ("out-bolt", new OutBolt()).shuffleGrouping("dataSource"); // Random grouping builder.setbolt ("out-bol2", new OutBolt2()).shuffleGrouping("out-bolt"); //1 local mode LocalCluster cluster = new LocalCluster(); // Submitting the topology will always poll cluster.submitTopology("topo", cfg, builder.createTopology()); / / 2 cluster pattern / / StormSubmitter submitTopology ("topo", cfg, builder.createTopology()); }}Copy the code
Storm implements word counter statistics
The data source Spout
package com.qxw.wordCount; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; @author QXW * @data */ public class WordSpout implements IRichSpout{private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private int index=0; private final String[] lines = {"long long ago I like playing with cat"."playing with cat make me happy"."I feel happy to be with you"."you give me courage"."I like to be together with you"."long long ago I like you"}; @override public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { this.collector=collector; } @override public voidnextTuple() {
this.collector.emit(new Values(lines[index]));
index++;
if(index>=lines.length){
index=0;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() { // TODO Auto-generated method stub } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } @Override public Map<String, Object>getComponentConfiguration() {
// TODO Auto-generated method stub
returnnull; }}Copy the code
Cutting components
public class WordSplitBolt implements IRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) { this.collector = collector; } /** * This function is also executed continuously, but its data comes from upstream. * @param tuple */ @override public void execute(tuple input) {String line = input.getStringByField("line");
String[] words = line.split("");
for(String word : words){
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
returnnull; }}Copy the code
Statistical component
public class WordCountBolt implements IRichBolt{ private static final long serialVersionUID = 1L; private OutputCollector collector; private HashMap<String, Long> counts=null; Public void prepare(Map stormConf, Map stormConf, Map stormConf, Map stormConf) TopologyContext context,OutputCollector collector) { this.collector = collector; this.counts=new HashMap<String, Long>(); } @override public void execute(Tuple input) {String word= input.getStringbyField (String word=input."word");
Long count = 1L;
if(counts.containsKey(word)){
count = counts.get(word) + 1;
}
counts.put(word, count);
System.out.println("Statistics words:"+word+"Number of occurrences:"+count);
this.collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"."count"));
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
returnnull; }}Copy the code
Output component
public class WordReportBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
System.out.printf("Real-time word count"+"%s\t%d\n", word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public void cleanup() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
returnnull; }}Copy the code
Topology Main function class
Public class WordTopology {public static void main(String[] args) throws InterruptedException { TopologyBuilder = new TopologyBuilder(); builder.setSpout("WordSpout", new WordSpout());
builder.setBolt("WordSplitBolt", new WordSplitBolt(),5).shuffleGrouping("WordSpout");
builder.setBolt("WordCountBolt", new WordCountBolt(),5).fieldsGrouping("WordSplitBolt", new Fields("word"));
builder.setBolt("WordReportBolt", new WordReportBolt(),10).globalGrouping("WordCountBolt"); // Config CFG = new Config(); cfg.setDebug(false); LocalCluster cluster = new LocalCluster(); // Submitting the topology will always poll cluster.submitTopology("wordcount-topo", cfg, builder.createTopology()); }}Copy the code