sequence
This article focuses on the construction of Storm TridentTopology
The instance
@Test
public void testDebugTopologyBuild(){
FixedBatchSpout spout = new FixedBatchSpout(new Fields("user"."score"), 3,
new Values("nickt1", 4),
new Values("nickt2", 7),
new Values("nickt3", 8),
new Values("nickt4", 9),
new Values("nickt5", 7),
new Values("nickt6", 11),
new Values("nickt7", 5)); spout.setCycle(false);
TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("spout1",spout)
.each(new Fields("user"."score"), new BaseFunction() {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println("tuple:"+tuple);
}
},new Fields());
topology.build();
}
Copy the code
- Much of the subsequent analysis is based on this example for simplicity
TridentTopology.newStream
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/TridentTopology.java
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
}
public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
}
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}
public Stream newStream(String txId, ITridentDataSource dataSource) {
if (dataSource instanceof IBatchSpout) {
return newStream(txId, (IBatchSpout) dataSource);
} else if (dataSource instanceof ITridentSpout) {
return newStream(txId, (ITridentSpout) dataSource);
} else if (dataSource instanceof IPartitionedTridentSpout) {
return newStream(txId, (IPartitionedTridentSpout) dataSource);
} else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
} else {
throw new UnsupportedOperationException("Unsupported stream");
}
}
public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
protected Stream addNode(Node n) {
registerNode(n);
return new Stream(this, n.name, n);
}
protected void registerNode(Node n) {
_graph.addVertex(n);
if(n.stateInfo! =null) { String id = n.stateInfo.id;if(!_colocate.containsKey(id)) {
_colocate.put(id, new ArrayList());
}
_colocate.get(id).add(n);
}
}
Copy the code
- The first argument to newStream is txId and the second argument is ITridentDataSource
- ITridentDataSource is divided into several types, respectively IBatchSpout, ITridentSpout, IPartitionedTridentSpout, IOpaquePartitionedTridentSpout
- SpoutNode is created and registerNode is added to _graph(
If stateInfo for node is not null, it is also added to _colocate, but SpoutNode is null
.), note SpoutNode SpoutType SpoutNode SpoutType. The BATCH
Node
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/Node.java
public class Node extends DefaultResourceDeclarer<Node> implements Serializable {
private static final AtomicInteger INDEX = new AtomicInteger(0);
private String nodeId;
public String name = null;
public Fields allOutputFields;
public String streamId;
public Integer parallelismHint = null;
public NodeStateInfo stateInfo = null;
public int creationIndex;
public Node(String streamId, String name, Fields allOutputFields) {
this.nodeId = UUID.randomUUID().toString();
this.allOutputFields = allOutputFields;
this.streamId = streamId;
this.name = name;
this.creationIndex = INDEX.incrementAndGet();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return nodeId.equals(((Node) o).nodeId);
}
@Override
public int hashCode() {
return nodeId.hashCode();
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
public String shortString() {
return "nodeId: " + nodeId + ", allOutputFields: "+ allOutputFields; }}Copy the code
- Node inherits DefaultResourceDeclarer, which implements the resources-related interfaces: ResourceDeclarer and ITridentResource
- Node has several subclasses: SpoutNode, ProcessorNode, and PartitionNode
- SpoutNode is the node description of SPout information, and ProcessorNode is the node description of trident operations such as each, Map, Aggregrate, reduce and project. PartitionNode is a node description related to a partition
TridentTopology.build
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/TridentTopology.java
public StormTopology build() { DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone(); / /... List<SpoutNode> spoutNodes = new ArrayList<>(); // can be regular nodes (static state) or processor nodes Set<Node> boltNodes = new LinkedHashSet<>();for(Node n: graph.vertexSet()) {
if(n instanceof SpoutNode) {
spoutNodes.add((SpoutNode) n);
} else if(! (n instanceof PartitionNode)) { boltNodes.add(n); } } Set<Group> initialGroups = new LinkedHashSet<>(); / /...for(Node n: boltNodes) {
initialGroups.add(new Group(graph, n));
}
GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
grouper.mergeFully();
Collection<Group> mergedGroups = grouper.getAllGroups();
// add identity partitions between groups
for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {
if(! (e.source instanceof PartitionNode) && ! (e.target instanceof PartitionNode)) { Group g1 = grouper.nodeGroup(e.source); Group g2 = grouper.nodeGroup(e.target); // g1 being null means thesource is a spout node
if(g1==null && ! (e.source instanceof SpoutNode)) throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
if(g1==null || ! g1.equals(g2)) { graph.removeEdge(e); PartitionNode pNode = makeIdentityPartition(e.source); graph.addVertex(pNode); graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0)); graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index)); }}} / /... // addin spouts as groups so we can get parallelisms
for(Node n: spoutNodes) {
grouper.addGroup(new Group(graph, n));
}
grouper.reindex();
mergedGroups = grouper.getAllGroups();
Map<Node, String> batchGroupMap = new HashMap<>();
List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {
String groupId = "bg" + i;
for(Node n: connectedComponents.get(i)) {
batchGroupMap.put(n, groupId);
}
}
// System.out.println("GRAPH:");
// System.out.println(graph);
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
TridentTopologyBuilder builder = new TridentTopologyBuilder();
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
for(SpoutNode sn: spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
Map<String, Number> spoutRes = new HashMap<>(_resourceDefaults);
spoutRes.putAll(sn.getResources());
Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
SpoutDeclarer spoutDeclarer = null;
if(sn.type == SpoutNode.SpoutType.DRPC) {
spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
(IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {
ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {
s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
} else if(sn.spout instanceof ITridentSpout) {
s = (ITridentSpout) sn.spout;
} else {
throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
}
spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
}
if(onHeap ! = null) {if(offHeap ! = null) { spoutDeclarer.setMemoryLoad(onHeap, offHeap); }else{ spoutDeclarer.setMemoryLoad(onHeap); }}if(cpuLoad != null) {
spoutDeclarer.setCPULoad(cpuLoad);
}
}
for(Group g: mergedGroups) {
if(! isSpoutGroup(g)) { Integer p = parallelisms.get(g); Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap); Map<String, Number> groupRes = g.getResources(_resourceDefaults); Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT); BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p, committerBatches(g, batchGroupMap), streamToGroup);if(onHeap ! = null) {if(offHeap ! = null) { d.setMemoryLoad(onHeap, offHeap); }else{ d.setMemoryLoad(onHeap); }}if(cpuLoad ! = null) { d.setCPULoad(cpuLoad); } Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
String componentId = parent instanceof SpoutNode ?
spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
}
}
}
HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
combinedMasterCoordResources.putAll(_masterCoordResources);
return builder.buildTopology(combinedMasterCoordResources);
}
Copy the code
- Here we create the TridentOlogyBuilder, and then for spoutNodes, Call TridentTopologyBuilder. SetSpout (String id, String streamName, String txStateId, ITridentSpout spout, Integer Parallelism, String batchGroup) method, add spout
- For spouts of type IBatchSpout, wrap ITridentSpout with BatchSpoutExecutor
- StreamName here as the streamId, through UniqueIdGen. GetUniqueStreamId generated, starting with the s, followed by _streamCounter count, such as 1, add up to is s1; TxStateId is the txId passed in by the user. The batchGroup starts with bg, followed by the index of the connectedComponents element, such as 0, which adds up to BG0; The parallelism parameter is set when the user builds the topology
- After setting spOUT, you need to set spout-related resource configurations, such as memoryLoad and cpuLoad. Then set up Bolt, using SubtopologyBolt, and set up the resource configuration associated with Bolt
- The last call TridentTopologyBuilder buildTopology
TridentTopologyBuilder.setSpout
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java
Map<GlobalStreamId, String> _batchIds = new HashMap();
Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
Map<String, String> batchGroups = new HashMap();
batchGroups.put(streamName, batchGroup);
markBatchGroups(id, batchGroups);
TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
_spouts.put(id, c);
return new SpoutDeclarerImpl(c);
}
private void markBatchGroups(String component, Map<String, String> batchGroups) {
for(Map.Entry<String, String> entry: batchGroups.entrySet()) { _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); }}Copy the code
- MarkBatchGroups are called to add the new Component to _batchIds as well as _spouts
TridentTopologyBuilder.setBolt
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java
Map<GlobalStreamId, String> _batchIds = new HashMap();
Map<String, Component> _bolts = new HashMap();
// map from stream name to batch id
public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
markBatchGroups(id, batchGroups);
Component c = new Component(bolt, parallelism, committerBatches);
_bolts.put(id, c);
return new BoltDeclarerImpl(c);
}
private void markBatchGroups(String component, Map<String, String> batchGroups) {
for(Map.Entry<String, String> entry: batchGroups.entrySet()) { _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); }}Copy the code
- Here we call markBatchGroups to add the new component to _batchIds, as well as to _bolts; For Trident, this is a series of processorNodes (
There may also be partitionNodes
)
TridentTopologyBuilder.buildTopology
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
Map<String, List<String>> batchesToCommitIds = new HashMap<>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
for(String id: _spouts.keySet()) {
TransactionalSpoutComponent c = _spouts.get(id);
if(c.spout instanceof IRichSpout) {
//TODO: wrap this to set the stream name
builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
} else {
String batchGroup = c.batchGroupId;
if(! batchesToCommitIds.containsKey(batchGroup)) { batchesToCommitIds.put(batchGroup, new ArrayList<String>()); } batchesToCommitIds.get(batchGroup).add(c.commitStateId);if(! batchesToSpouts.containsKey(batchGroup)) { batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>()); } batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); BoltDeclarer scd = builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);for(Map<String, Object> m: c.componentConfs) {
scd.addConfigurations(m);
}
Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
specs.put(c.batchGroupId, new CoordSpec());
BoltDeclarer bd = builder.setBolt(id,
new TridentBoltExecutor(
new TridentSpoutExecutor(
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
if(c.spout instanceof ICommitterTridentSpout) {
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
for(Map<String, Object> m: c.componentConfs) { bd.addConfigurations(m); }}} / /... Number onHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB); Number offHeap = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB); Number cpuLoad = masterCoordResources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);for(String batch: batchesToCommitIds.keySet()) {
List<String> commitIds = batchesToCommitIds.get(batch);
SpoutDeclarer masterCoord = builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
if(onHeap ! = null) {if(offHeap ! = null) { masterCoord.setMemoryLoad(onHeap, offHeap); }else{ masterCoord.setMemoryLoad(onHeap); }}if(cpuLoad != null) {
masterCoord.setCPULoad(cpuLoad);
}
}
for(String id: _bolts.keySet()) {
Component c = _bolts.get(id);
Map<String, CoordSpec> specs = new HashMap<>();
for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
String batch = batchIdsForBolts.get(s);
if(! specs.containsKey(batch)) specs.put(batch, new CoordSpec()); CoordSpec spec = specs.get(batch); CoordType ct;if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
ct = CoordType.single();
} else {
ct = CoordType.all();
}
spec.coords.put(s.get_componentId(), ct);
}
for(String b: c.committerBatches) {
specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
for(Map<String, Object> conf: c.componentConfs) {
d.addConfigurations(conf);
}
for(InputDeclaration inputDecl: c.declarations) {
inputDecl.declare(d);
}
Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
for(Map.Entry<String, Set<String>> entry: batchToComponents.entrySet()) {
for(String comp: entry.getValue()) { d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey())); }}for(String b: c.committerBatches) { d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); }}return builder.createTopology();
}
Copy the code
- BuildTopology For spouts that are not IRichSpout will create the TridentSpoutCoordinator bolt in the topology, It globalGrouping MasterBatchCoordinator. BATCH_STREAM_ID (
$batch
), MasterBatchCoordinator. SUCCESS_STREAM_ID ($success
) These two streams; Also created TridentBoltExecutor this bolt, it allGrouping MasterBatchCoordinator. BATCH_STREAM_ID ($batch
), MasterBatchCoordinator. SUCCESS_STREAM_ID ($success
), for spout is ICommitterTridentSpout types, also allGrouping MIT_STREAM_ID (MasterBatchCoordinator.COM$commit
); Note that the spout that is not IRichSpout is converted to bolt - Then create the MasterBatchCoordinator spout for each batch of batchesToCommitIds, which connects the TridentSpoutCoordinator and TridentBoltExecutor
- For Bolt (
SubtopologyBolt wrapped for ProcessorNode
), set up TridentBoltExecutor this bolt, it directGrouping TridentBoltExecutor. COORD_STREAM ($coord-
), at the same time also allGrouping MIT_STREAM_ID (MasterBatchCoordinator.COM$commit
)
TridentTopologyBuilder.createTopology
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
maybeAddCheckpointSpout();
for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
bolt = maybeAddCheckpointTupleForwarder(bolt);
ComponentCommon common = getComponentCommon(boltId, bolt);
try{
maybeAddCheckpointInputs(common);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
}catch(RuntimeException wrapperCause){
if(wrapperCause.getCause() ! = null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){ throw new IllegalStateException("Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + "," +
"which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + "" +
"should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause); } throw wrapperCause; }}for(String spoutId: _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
try{
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
}catch(RuntimeException wrapperCause){
if(wrapperCause.getCause() ! = null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())){ throw new IllegalStateException("Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + "," +
"which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + "" +
"should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause);
}
throw wrapperCause;
}
}
StormTopology stormTopology = new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<String, StateSpoutSpec>());
stormTopology.set_worker_hooks(_workerHooks);
return Utils.addVersions(stormTopology);
}
/**
* If the topology has at least one stateful bolt
* add a {@link CheckpointSpout} component to the topology.
*/
private void maybeAddCheckpointSpout() {
if (hasStatefulBolt) {
setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1); }}Copy the code
- The createTopology function was designed to add a CheckpointSpout that was stateful bolt. For each Bolt that is a statefulBolt and not a StatefulBoltExecutor, the CheckpointTupleForwarder is added
- After a series of buildTopology Settings, when we get to createTopology, we have three Bolts, one TridentBoltExecutor that wraps ProcessNode, one TridentSpoutCoordinator, Another is TridentBoltExecutor, which wraps the original spout
- Spout there is only one MasterBatchCoordinator. During buildTopology, spouts that are not IRichSpout will be converted to TridentSpoutCoordinator bolts
topology
- In the previous example, after the createTopology of the TridentTopologyBuilder, the resulting topology is a spout as the MasterBatchCoordinator(
$mastercoord-bg0
), three bolts are trident Spoutcoordinators ($spoutcoord-spout-spout1
), packaging of IRichSpout spout TridentBoltExecutor (spout-spout1
TridentBoltExecutor() wrapped ProcessorNodeb-0
); A total involves several stream, respectively MasterBatchCoordinator. SUCCESS_STREAM_ID ($success
MIT_STREAM_ID (), MasterBatchCoordinator.COM$commit
), MasterBatchCoordinator. BATCH_STREAM_ID ($batch
), TridentBoltExecutor. COORD_STREAM ($coord-bg0
), S1, s2 $mastercoord-bg0
Declare it$success
,$commit
,$batch
These three streams and outputFields are all tx fields$spoutcoord-spout-spout1
It receives the$mastercoord-bg0
the$success
,$batch
Both streams are declared$batch
The stream outputFields are [tx,metadata]spout-spout1
, it allGrouping receives$mastercoord-bg0
the$success
, as well as$spoutcoord-spout-spout1
the$batch
Data for the two streams; At the same time will to$coord-bg0
Send [id,count] data, and stream(s1
) sends a data tupleb-0
It receives thespout-spout1
the$coord-bg0
And the data for the two streams s1, and then to the stream(s2
) Sending data (output_fields:[$batchId, user, score]
), and also goes to stream($coord-bg0
) sends [id, count] data
summary
- TridentTopologyBuilder will be converted to TridentBoltExecutor bolt when we buildTopology for spouts that are not IRichSpout. – TridentSpoutCoordinator bolt will be added ProcessorNode will be wrapped as TridentBoltExecutor bolt; TridentTopology wraps the user-specified SPout as a Bolt for administrative purposes, and then creates the MasterBatchCoordinator as the real SPout
- TridentBoltExecutor.COORD_STREAM(
$coord-
This stream is used to pass [ID, count] data between Components to ensure that tuples are fully transmitted in each Component, i.e. both Spout and Bolt send [ID, count] data to this stream - MasterBatchCoordinator, TridentSpoutCoordinator, TridentBoltExecutor(
spout-spout1
The relationship between them is as follows: the master will givespout-spout1
Sending suceess data (The tuple \ instruction
), and send suceess and Batch data (The tuple \ instruction
); The coordinator will givespout-spout1
Sending batch data (The tuple \ instruction
)
doc
- Trident API Overview
- Trident Spouts
- Talk about storm LinearDRPCTopologyBuilder