sequence
This paper mainly studies the shunting and aggregation of Storm Trident Batch
The instance
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.partitionBy(new Fields("user"))
.partitionAggregate(new Fields("user"."score"."batchId"),new OriginUserCountAggregator(),new Fields("result"."aggBatchId"))
.parallelismHint(3)
.global()
.aggregate(new Fields("result"."aggBatchId"),new AggAgg(),new Fields("agg"))
.each(new Fields("agg"),new PrintEachFunc(),new Fields())
;
Copy the code
- Here, three bolts are finally constructed, namely B-0, B-1 and B-2
- B-0 is primarily partitionAggregate, whose parallelismHint is 3
- The parallelismHint is 1, and since its upstream bolt has 3 tasks, the parallelismHint is 1. Thus its TridentBoltExecutor tracked. Condition. ExpectedTaskReports is 3, it will have to wait until the three task of aggregated data came after, can finishBatch
- B-2 mainly deals with CombinerAggregator’s combine as well as each operation
- The whole data flow starts from a batch of spout, and is divided into 3 sub-batches through partitionBy in B-0. After b-1, 3 sub-batches are aggregated before finishBatch. After B-2, the final aggregation is made after b-1 aggregation
The log instance
[Thread 23:22:00. 718-49 - spout - spout1 - executor 11 [11]] INFO com. The example. The demo. Trident. Batch. DebugFixedBatchSpout - batchId:1,emit:[nickt1, 718 [1] 23:22:00. Thread - 49 - spout - spout1 - executor 11 [11]] INFO com. The example. The demo. Trident. Batch. DebugFixedBatchSpout - batchId:1,emit:[nickt2, 718 [1] 23:22:00. Thread - 49 - spout - spout1 - executor 11 [11]] INFO com. The example. The demo. Trident. Batch. DebugFixedBatchSpout - batchId:1,emit:[nickt3, 45-720 [1] 23:22:00. Thread - b - 0 - executor 8] [8] the INFO com. Example. Demo. Trident. OriginUserCountAggregator - null init map, [Thread aggBatchId: 1-0 23:22:00. 720-45 - b - 0 - executor 8] [8] the INFO com. Example. Demo. Trident. OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 45-720 [1] 23:22:00. Thread - b - 0 - executor 8] [8] the INFO com. The example. The demo. Trident. OriginUserCountAggregator - null complete Agg Batch :1:0, Val :{1={nickt2=1}} 23:22:00.722 [Thread-22-B-0-executor [7 7]] INFO com.example.demo.trident.OriginUserCountAggregator - null init map, AggBatchId: 1-0 23:22:00. [Thread - 723 - b - 29 0 - executor 6] [6] the INFO com. Example. Demo. Trident. OriginUserCountAggregator - null init map, [Thread aggBatchId: 1-0 23:22:00. 723-22 - b - 0 - executor 7 [7]] INFO com. Example. Demo. Trident. OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 723 [1] 23:22:00. Thread - 29 - b - 0 - executor 6] [6] the INFO com. The example. The demo. Trident. OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 723 [1] 23:22:00. Thread - 22 - b - 0 - executor 7 [7]] INFO com. The example. The demo. Trident. OriginUserCountAggregator - null complete Agg Batch :1:0, Val :{1={nickt1=1}} 23:22:00.723 [Thread-29-B-0-executor [6 6]] INFO Com. Example. Demo. Trident. OriginUserCountAggregator - null complete agg batch: 1-0, val: 1 = {{nickt3 = 1}} 23:22:00. 724 [Thread - 36 - b - 1 - executor 9 [9]] INFO com. Example. Demo. Trident. AggAgg - zero called 23:22:00. [Thread - 36-724 - b - 1 executor [9 9]] INFO com.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, [Thread 1-0] 23:22:00. 724-36 - b - 1 - executor 9 [9]] INFO com. Example. Demo. Trident. AggAgg - combine Val1: {}, val2:1 = {{nickt2 = 1}} 23:22:00, 726 [Thread - 36 - b - 1 - executor 9 [9]] INFO com. Example. Demo. Trident. AggAgg - init tuple:[{1={nickt3=1}}, [Thread 1-0] 23:22:00. 727-36 - b - 1 - executor 9 [9]] INFO com. Example. Demo. Trident. AggAgg - combine Val1: {1 = {nickt2 = 1}}, val2:1 = {{nickt3 = 1}} 23:22:00, 728 [Thread - 36 - b - 1 - executor 9 [9]] INFO com. Example. Demo. Trident. AggAgg - init tuple:[{1={nickt1=1}}, [Thread 1-0] 23:22:00. 728-36 - b - 1 - executor 9 [9]] INFO com. Example. Demo. Trident. AggAgg - combine val1:1 = {{nickt3 = 1, Nickt2 = 1}}, val2:1 = {{nickt1 = 1}} 23:22:00, 731 [Thread - 31 - b - 2 - executor 10] [10] the INFO com. Example. Demo. Trident. AggAgg - zero [Thread called 23:22:00. 731-31 - b - 2 - executor 10] [10] the INFO com. Example. Demo. Trident. AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, [Thread nickt1 = 1}} 23:22:00. 731-31 - b - 2 - executor 10] [10] the INFO com. The example. The demo. Trident. PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}]Copy the code
- The storm thread has been named with bolt names like B-0, B-1, b-2
TridentBoltExecutor
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) {
if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
_lastRotate = now;
}
return;
}
String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());
if(batchGroup==null) {
// this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // } //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks
if(tracked! =null) {if(id.getAttemptId() > tracked.attemptId) {
_batches.remove(id.getId());
tracked = null;
} else if(id.getAttemptId() < tracked.attemptId) {
// no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private void failBatch(TrackedBatch tracked, FailedException e) { if(e! =null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck! =null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream! =null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(! delayed && ! failed) { _collector.ack(tuple); }}Copy the code
- The execute method creates a TrackedBatch if it does not already exist and calls _bolt.initbatchState when it is created
- Execute (tracked. Info, tuple), then ack (_collector). If _bolt.execute throws FailedException, Failed is marked as true and then checkFinish is called after tuples are sent or received throughout the batch. If tracked. Failed is found, _collector.fail is called
- There are two classes of _bolts: TridentSpoutExecutor and SubtopologyBolt. If it is TridentSpoutExecutor tracked. Condition. ExpectedTaskReports is 0, each received a tuple (here
It actually sends a batch of instructions
), finishBatch immediately after _bolt.execute; With SubtopologyBolt tracked here. Condition. ExpectedTaskReports is 0, not need to wait until the last [id, count] instruction checkFinish again
TridentSpoutExecutor
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/TridentSpoutExecutor.java
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) {
_emitter = _spout.getEmitter(_txStateId, conf, context);
_collector = new AddIdCollector(_streamName, collector);
}
@Override
public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) { if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) { ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt); _activeBatches.remove(attempt.getTransactionId()); } else { throw new FailedException("Received commit for different transaction attempt"); } } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { // valid to delete before what's been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
} else{ _collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt); } } @Override public void finishBatch(BatchInfo batchInfo) { } @Override public Object initBatchState(String batchGroup, Object batchId) {return null;
}
Copy the code
- TridentSpoutExecutor uses AddIdCollector, whose initBatchState and finishBatch methods are null
- Execute COMMIT_STREAM_ID, SUCCESS_STREAM_ID, and plain stream
- EmitBatch emits batch tuples. _emitBatch emits batch tuples
SubtopologyBolt
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/planner/SubtopologyBolt.java
@Override
public Object initBatchState(String batchGroup, Object batchId) {
ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
p.startBatch(ret);
}
return ret;
}
@Override
public void execute(BatchInfo batchInfo, Tuple tuple) {
String sourceStream = tuple.getSourceStreamId();
InitialReceiver ir = _roots.get(sourceStream);
if(ir==null) {
throw new RuntimeException("Received unexpected tuple " + tuple.toString());
}
ir.receive((ProcessorContext) batchInfo.state, tuple);
}
@Override
public void finishBatch(BatchInfo batchInfo) {
for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
p.finishBatch((ProcessorContext) batchInfo.state);
}
}
protected static class InitialReceiver {
List<TridentProcessor> _receivers = new ArrayList<>();
RootFactory _factory;
ProjectionFactory _project;
String _stream;
public InitialReceiver(String stream, Fields allFields) {
// TODO: don't want to project for non-batch bolts... ????? // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List
projected = new ArrayList<>(allFields.toList()); projected.remove(0); _project = new ProjectionFactory(_factory, new Fields(projected)); } public void receive(ProcessorContext context, Tuple tuple) { TridentTuple t = _project.create(_factory.create(tuple)); for(TridentProcessor r: _receivers) { r.execute(context, _stream, t); } } public void addReceiver(TridentProcessor p) { _receivers.add(p); } public Factory getOutputFactory() { return _project; }}
Copy the code
- Its initBatchState method creates ProcessorContext and then calls TridentProcessor(
Such as AggregateProcessor and EachProcessor
StartBatch method of) - The execute method calls InitialReceiver’s execute, which in turn calls TridentProcessor’s execute method (
Such as AggregateProcessor
) - FinishBatch calls TridentProcessor(
Such as AggregateProcessor and EachProcessor
) finishBatch method
WindowTridentProcessor
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java
@Override
public void startBatch(ProcessorContext processorContext) {
// initialize state for batch
processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();
}
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
// add tuple to the batch state
Object state = processorContext.state[tridentContext.getStateIndex()];
((List<TridentTuple>) state).add(projection.create(tuple));
}
@Override
public void finishBatch(ProcessorContext processorContext) {
Object batchId = processorContext.batchId;
Object batchTxnId = getBatchTxnId(batchId);
LOG.debug("Received finishBatch of : [{}] ", batchId);
// get all the tuples in a batch and add it to trident-window-manager
List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
tridentWindowManager.addTuplesBatch(batchId, tuples);
List<Integer> pendingTriggerIds = null;
List<String> triggerKeys = new ArrayList<>();
Iterable<Object> triggerValues = null;
if (retriedAttempt(batchId)) {
pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
if(pendingTriggerIds ! = null) {for(Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); / /}}if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
if(triggerValues == null) {
pendingTriggerIds = new ArrayList<>();
Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
try {
Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
List<Object> values = new ArrayList<>();
StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
while (pendingTriggersIter.hasNext()) {
triggerResult = pendingTriggersIter.next();
for (List<Object> aggregatedResult : triggerResult.result) {
String triggerKey = triggerKey(triggerResult.id);
triggerKeys.add(triggerKey);
values.add(aggregatedResult);
pendingTriggerIds.add(triggerResult.id);
}
pendingTriggersIter.remove();
}
triggerValues = values;
} finally {
// store inprocess triggers of a batch in store for batch retries for any failures
if(! pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0;for (Object resultValue : triggerValues) {
collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
}
collector.setContext(null);
}
Copy the code
- When can see WindowTridentProcessor between startBatch give processorContext. State [tridentContext. GetStateIndex ()] new a list again
- At the time of the execute, will receive the tuple to processorContext. The state [tridentContext. GetStateIndex ()]
- At finishBatch, Will processorContext. State [tridentContext getStateIndex ()] data added to the windowStore and windowManager ConcurrentLinkedQueue
- The window trigger will remove window data from ConcurrentLinkedQueue and add it to pendingTriggers; While Windows Processor removes the data emitted by pendingTriggers during finishBatch and then emits through FreshCollector
- Data sent by FreshCollector is received and processed by its TupleReceiver (
Such as ProjectedProcessor, PartitionPersistProcessor
), PartitionPersistProcessor is to save the data as a state, whereas ProjectedProcessor outputFields extraction field according to the window, and then pass the data to downstream of the processor, Such as EachProcessor
summary
- Trident spout sends a batch of data and waits for the downstream to complete the batch execution before finishBatch by batch. For bolts, the ack interval between tuples depends on the processing time of each tuple (
TridentBoltExecutor will automatically ack for you after the tuple has been processed
If the processing time is too long, the tuple processing of the whole topology times out and spout fail is triggered. At this point, the batchId is re-triggered. If spout is transactional, the tuples corresponding to the batchId remain unchanged - The window operation scrambles the original Batch. A batch of data is first accumulated in the state of ProcessContext.
WindowTridentProcessor resets state every time it starts startBatch
In finishBatch, copy data to windowStore and windowManager ConcurrentLinkedQueue, then wait for window trigger to trigger, calculate window data. And then I put it into the pendingTriggers, and when I do bolt finishBatch I remove the window data from the pendingTriggers and then pass it to the FreshCollector and then to the downstream processor, The startBatch and finishBatch of the downstream processor follow the rhythm of the original SPout instead of being triggered by window - The speed at which spout sends batch depends on config.topology_trident_batch_emit_interval_millis (
Topology. Trident. Batch. Emit. Interval. Millis, in defaults. Yaml is 500 by default
), and the window’s interval is usually larger than the default Batch interval. In this case, the Window aggregates batches of data. At the same time, since data was added to the windowManager ConcurrentLinkedQueue at the time of finishBatch, there is no data available at pendingTriggers at this time. Therefore, the data obtained from the window in the previous several times of finishBatch is empty, so the subsequent processor does not process data. Therefore, it is necessary to pay attention to nullation to prevent null Pointers - When parallelism is 1, groupBy/partitionBy is batch. When Parallelism is greater than 1, the original batch spout is distributed to multiple partitions/tasks, and the original batch data stream is split. Each task performs its own finishBatch operation after processing the data.
Tuple emit in sequence. The last tuple is [ID,count], which terminates the batch instruction and is used to detect and trigger the completion of the batch operation
), and then send the data of the new batch to the downstream. When the new batch is sent, send [ID, Cout], and perform batch operation in the downstream bolt in turn. The global operation distributes data to the same partition/task. BatchGlobal is the same as Global when parallelism is 1. When Parallelism is greater than 1, data is distributed to different partitions/tasks by batchId - Aggregate is used to aggregate data. Generally, the upstream batch is aggregated together with groupBy or partitionBy, and then aggregated by batch. If parallelism is greater than 1, aggregate(global().aggregate()) can be aggregated by tasks. If parallelism is greater than 1, aggregate() can be aggregated by tasks. As long as there is no window operation in the middle, the original batch will be used to aggregate. Because TridentBoltExecutor tracked. Condition. ExpectedTaskReports which records the bolt needs to wait for a few task report/id, the count, in the receiving/id, count data, Judging will be tracked. Whether reportedTasks equals cond. ExpectedTaskReports, equality before tracked. Whether receivedTuples is tracked. ExpectedTupleCount, [id,count] data is sent downstream to finishBatch. According to the judgment of expectedTaskReports, yes the whole batch can be aggregated together according to the original batch after being processed by multiple tasks. Note, however, that the Window operation scrambles the original Batch of the Trident SPout during the Window phase
doc
- Talk about Storm’s window trigger
- Talk about storm Windows Processor’s FreshCollector
- Storm AggregateProcessor execute and finishBatch methods
- Talk about The pendingTriggers for Storm Trent Windows Manager
- Talk about storm TridentBoltExecutor’s finishBatch method