sequence

This paper focuses on storm direct grouping

direct grouping

Direct grouping is a special grouping where an upstream producer directly assigns a downstream task to receive a tuple. Using direct grouping involves the following steps:

1. Save the taskId list of the downstream bolt in the prepare method

public class SentenceDirectBolt extends BaseRichBolt { private static final Logger LOGGER = LoggerFactory.getLogger(SentenceDirectBolt.class); private OutputCollector collector; private List<Integer> taskIds; private int numCounterTasks; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; / / NOTE 1 here to take to the downstream bolt taskId, used for emitDirect specified when the taskId enclosing taskIds = context. GetComponentTasks ("count-bolt"); this.numCounterTasks = taskIds.size(); } / /... }Copy the code

This saves a list of taskId for downstream bolts to select for emitDirect

2. Use declareStream upstream to declare streamId in declareOutputFields

public class SentenceDirectBolt extends BaseRichBolt {
	//......
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word")); //NOTE 2 needs to passdeclareStream declares direct Stream and specifies streamId declarer. DeclareStream ("directStreamDemo1".true,new Fields("word"));
        declarer.declareStream("directStreamDemo2".true,new Fields("word")); }}Copy the code

Here we declare two streamId, directStreamDemo1 and directStreamDemo2

3. EmitDirect is used in upstream to specify taskId and streamId in downstream

public class SentenceDirectBolt extends BaseRichBolt {
	//......
	public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split("");
        for(String word : words){
            int targetTaskId = getWordCountTaskId(word);
            LOGGER.info("word:{} choose taskId:{}",word,targetTaskId); // NOTE 3 here specifies which taskId to send to the downstream bolt, along with streamIdif(targetTaskId % 2 == 0){
                this.collector.emitDirect(targetTaskId,"directStreamDemo1",new Values(word));
            }else{
                this.collector.emitDirect(targetTaskId,"directStreamDemo2",new Values(word)); } } this.collector.ack(tuple); }}Copy the code

The emitDirect(int taskId, String streamId, List<Object> tuple) method is used here to specify the downstream taskId and the streamId to be sent

4. The downstream uses directGrouping to connect the upstream Bolt and streamId

    @Test
    public void testDirectGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt("split-bolt", new SentenceDirectBolt()).shuffleGrouping("sentence-spout"); // SplitSentenceBolt --> WordCountBolt"count-bolt", new WordCountBolt(),5).directGrouping("split-bolt"."directStreamDemo1");
        // WordCountBolt --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt()).globalGrouping("count-bolt");

        submitRemote(builder);
    }
Copy the code

As the downstream of split-bolt, count-Bolt uses directGrouping and specifies the streamId to receive as directStreamDemo1

summary

  • Direct grouping is a special grouping where an upstream producer directly assigns a downstream task to receive a tuple.
  • Downstream uses directGrouping to connect upstream and specify streamId to consume. Upstream stores taskId when preparing and uses declareStream to declare streamId when declareOutputFields are used. Finally, the emitDirect(int taskId, String streamId, List tuple) method is used in the execute method to specify the downstream taskId and the streamId to send

    doc

    • Concepts
    • Common Topology Patterns
    • About Storm Stream Grouping