sequence
In this paper, we study the storm OpaquePartitionedTridentSpoutExecutor
TridentTopology.newStream
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/TridentTopology.java
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}
Copy the code
- TridentTopology newStream method, for IOpaquePartitionedTridentSpout types of spout will use OpaquePartitionedTridentSpoutExecutor to packing; While KafkaTridentSpoutOpaque IOpaquePartitionedTridentSpout interface is realized
TridentTopologyBuilder.buildTopology
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java
public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
Map<String, List<String>> batchesToCommitIds = new HashMap<>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
for(String id: _spouts.keySet()) {
TransactionalSpoutComponent c = _spouts.get(id);
if(c.spout instanceof IRichSpout) {
//TODO: wrap this to set the stream name
builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
} else {
String batchGroup = c.batchGroupId;
if(! batchesToCommitIds.containsKey(batchGroup)) { batchesToCommitIds.put(batchGroup, new ArrayList<String>()); } batchesToCommitIds.get(batchGroup).add(c.commitStateId);if(! batchesToSpouts.containsKey(batchGroup)) { batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>()); } batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); BoltDeclarer scd = builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);for(Map<String, Object> m: c.componentConfs) {
scd.addConfigurations(m);
}
Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
specs.put(c.batchGroupId, new CoordSpec());
BoltDeclarer bd = builder.setBolt(id,
new TridentBoltExecutor(
new TridentSpoutExecutor(
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
if(c.spout instanceof ICommitterTridentSpout) {
bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
for(Map<String, Object> m: c.componentConfs) { bd.addConfigurations(m); }}} / /...return builder.createTopology();
}
Copy the code
- TridentTopologyBuilder. Will buildTopology IOpaquePartitionedTridentSpout (
OpaquePartitionedTridentSpoutExecutor
) wrapped with TridentSpoutExecutor, and then wrapped with TridentBoltExecutor as Bolt
OpaquePartitionedTridentSpoutExecutor
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> { protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class); IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout; / /... public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) { _spout = spout; } @Override public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {return new Coordinator(conf, context);
}
@Override
public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
return new Emitter(txStateId, conf, context);
}
@Override
public Fields getOutputFields() {
return _spout.getOutputFields();
}
@Override
public Map<String, Object> getComponentConfiguration() {
return_spout.getComponentConfiguration(); }}Copy the code
- OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout getCoordinator return here is ITridentSpout BatchCoordinator, GetEmitter returns ICommitterTridentSpout. Emitter
ITridentSpout.BatchCoordinator
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
IOpaquePartitionedTridentSpout.Coordinator _coordinator;
public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}
@Override
public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata);
return _coordinator.getPartitionsForBatch();
}
@Override
public void close() {
LOG.debug("Closing");
_coordinator.close();
LOG.debug("Closed");
}
@Override
public void success(long txid) {
LOG.debug("Success [txid = {}]", txid);
}
@Override
public boolean isReady(long txid) {
boolean ready = _coordinator.isReady(txid);
LOG.debug("[isReady = {}], [txid = {}]", ready, txid);
returnready; }}Copy the code
- Packing the spout _coordinator, its type IOpaquePartitionedTridentSpout. The Coordinator, much here is just the debug logs
ICommitterTridentSpout.Emitter
Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
public class Emitter implements ICommitterTridentSpout.Emitter {
IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
TransactionalState _state;
TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
int _index;
int _numTasks;
public Emitter(String txStateId, Map conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
_state = TransactionalState.newUserState(conf, txStateId);
LOG.debug("Created {}", this);
}
Object _savedCoordinatorMeta = null;
boolean _changedMeta = false;
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
tx, coordinatorMeta, collector, this);
if(_savedCoordinatorMeta==null || ! _savedCoordinatorMeta.equals(coordinatorMeta)) { _partitionStates.clear(); final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);for (ISpoutPartition partition : taskPartitions) {
_partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
}
// refresh all partitions for backwards compatibility with old spout
_emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
Map<String, Object> metas = new HashMap<>();
_cachedMetas.put(tx.getTransactionId(), metas);
Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
Map<String, Object> prevCached;
if(entry! =null) { prevCached = entry.getValue(); }else {
prevCached = new HashMap<>();
}
for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
String id = e.getKey();
EmitterPartitionState s = e.getValue();
s.rotatingState.removeState(tx.getTransactionId());
Object lastMeta = prevCached.get(id);
if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
metas.put(id, meta);
}
LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
tx, coordinatorMeta, collector, this);
}
@Override
public void success(TransactionAttempt tx) {
for(EmitterPartitionState state: _partitionStates.values()) {
state.rotatingState.cleanupBefore(tx.getTransactionId());
}
LOG.debug("Success transaction {}. [{}]", tx, this);
}
@Override
public void commit(TransactionAttempt attempt) {
LOG.debug("Committing transaction {}. [{}]", attempt, this);
// this code here handles a case where a previous commit failed, and the partitions
// changed since the last commit. This clears out any state for the removed partitions
// for this txid.
// we make sure only a single task ever does this. we're also guaranteed that // it's impossible for there to be another writer to the directory for that partition
// because only a single commit can be happening at once. this is because in order for
// another attempt of the batch to commit, the batch phase must have succeeded in between.
// hence, all tasks for the prior commit must have finished committing (whether successfully or not)
if(_changedMeta && _index==0) {
Set<String> validIds = new HashSet<>();
for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
validIds.add(p.getId());
}
for(String existingPartition: _state.list("")) {
if(! validIds.contains(existingPartition)) { RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition); s.removeState(attempt.getTransactionId()); } } _changedMeta =false;
}
Long txid = attempt.getTransactionId();
Map<String, Object> metas = _cachedMetas.remove(txid);
for(Entry<String, Object> entry: metas.entrySet()) {
_partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
}
LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this);
}
@Override
public void close() {
LOG.debug("Closing");
_emitter.close();
LOG.debug("Closed");
}
@Override
public String toString() {
return "Emitter{" +
", _state=" + _state +
", _cachedMetas=" + _cachedMetas +
", _partitionStates=" + _partitionStates +
", _index=" + _index +
", _numTasks=" + _numTasks +
", _savedCoordinatorMeta=" + _savedCoordinatorMeta +
", _changedMeta=" + _changedMeta +
'} '; } } static class EmitterPartitionState { public RotatingTransactionalState rotatingState; public ISpoutPartition partition; public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) { rotatingState = s; partition = p; }}Copy the code
- Here to spout IOpaquePartitionedTridentSpout. The Emitter encapsulation, _partitionStates EmitterPartitionState was used
- Calculation _partitionStates emitBatch method first, and then calculate prevCached, finally call _emitter. EmitPartitionBatch (tx, the collector, supachai panitchpakdi artition, lastMeta)
- Success method call state. RotatingState. CleanupBefore (tx) getTransactionId ()), empty before the txid state information; The commit method basically updates _partitionStates
KafkaTridentSpoutOpaque
Storm – kafka – the client – 1.2.2 – sources. The jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
KafkaTridentSpoutTopicPartition, Map<String, Object>> {
private static final long serialVersionUID = -8003272486566259640L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
private final KafkaTridentSpoutManager<K, V> kafkaManager;
public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
this(new KafkaTridentSpoutManager<>(conf));
}
public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
LOG.debug("Created {}", this.toString());
}
@Override
public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
Map conf, TopologyContext context) {
return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
}
@Override
public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
final Fields outputFields = kafkaManager.getFields();
LOG.debug("OutputFields = {}", outputFields);
return outputFields;
}
@Override
public final String toString() {
return super.toString() +
"{kafkaManager=" + kafkaManager + '} '; }}Copy the code
- KafkaTridentSpoutOpaque getCoordinator returns KafkaTridentSpoutOpaqueCoordinator; GetEmitter returns KafkaTridentSpoutEmitter
KafkaTridentSpoutOpaqueCoordinator
Storm – kafka – the client – 1.2.2 – sources. The jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
private final KafkaTridentSpoutManager<K,V> kafkaManager;
public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
LOG.debug("Created {}", this.toString());
}
@Override
public boolean isReady(long txid) {
LOG.debug("isReady = true");
return true; // the "old" trident kafka spout always returns true, like this
}
@Override
public List<Map<String, Object>> getPartitionsForBatch() {
final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
LOG.debug("TopicPartitions for batch {}", topicPartitions);
List<Map<String, Object>> tps = new ArrayList<>();
for(TopicPartition tp : topicPartitions) {
tps.add(tpSerializer.toMap(tp));
}
return tps;
}
@Override
public void close() {
LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
}
@Override
public final String toString() {
return super.toString() +
"{kafkaManager=" + kafkaManager +
'} '; }}Copy the code
- IsReady here always returns true, getPartitionsForBatch method mainly kafkaManager. GetTopicPartitions () structure of information into the map
KafkaTridentSpoutEmitter
Storm – kafka – the client – 1.2.2 – sources. The jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
List<Map<String, Object>>,
KafkaTridentSpoutTopicPartition,
Map<String, Object>>,
Serializable {
private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
// Kafka
private final KafkaConsumer<K, V> kafkaConsumer;
// Bookkeeping
private final KafkaTridentSpoutManager<K, V> kafkaManager;
// set of topic-partitions for which first poll has already occurred, and the first polled txid
private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>();
// Declare some KafkaTridentSpoutManager references forconvenience private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; private final RecordTranslator<K, V> translator; private final Timer refreshSubscriptionTimer; private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); private TopologyContext topologyContext; /** * Create a new Kafka spout emitter. * @param kafkaManager The Kafka consumer manager to use * @param topologyContext The topology context * @param refreshSubscriptionTimer The timerfor deciding when to recheck the subscription
*/
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
this.kafkaManager = kafkaManager;
this.topologyContext = topologyContext;
this.refreshSubscriptionTimer = refreshSubscriptionTimer;
this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
LOG.debug("Created {}", this.toString()); } /** * Creates instance of this class with default 500 millisecond refresh subscription timer */ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) { this(kafkaManager, topologyContext, new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); } / /... @Override public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) { LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);
final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
final Set<TopicPartition> assignments = kafkaConsumer.assignment();
KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
if(assignments == null || ! assignments.contains(currBatchPartition.getTopicPartition())) { LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
"[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
"of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
} else {
try {
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
// poll
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
}
final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
LOG.debug("Polled [{}] records from Kafka.", records.count());
if(! records.isEmpty()) { emitTuples(collector, records); // build new metadata currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records); } } finally { kafkaConsumer.resume(pausedTopicPartitions); LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
}
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
"[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
}
return currentBatch == null ? null : currentBatch.toMap();
}
private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
for (ConsumerRecord<K, V> record : records) {
final List<Object> tuple = translator.apply(record);
collector.emit(tuple);
LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
}
}
@Override
public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
"No action taken by this method for topic partitions {}", partitionResponsibilities);
}
/**
* Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
* for this task must be assigned to the Kafka consumer running on this task.
*
* @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
* @return ordered list of topic partitions for this task
*/
@Override
public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
List<TopicPartition> allTopicPartitions = new ArrayList<>();
for(Map<String, Object> map : allPartitionInfo) {
allTopicPartitions.add(tpSerializer.fromMap(map));
}
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
return allPartitions;
}
@Override
public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
List<Map<String, Object>> allPartitionInfo) {
final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
return taskTps;
}
@Override
public void close() {
kafkaConsumer.close();
LOG.debug("Closed");
}
@Override
public final String toString() {
return super.toString() +
"{kafkaManager=" + kafkaManager +
'} '; }}Copy the code
- RefreshSubscriptionTimer interval take here is kafkaManager getKafkaSpoutConfig () getPartitionRefreshPeriodMs (), the default is 2000
- EmitPartitionBatch method calls once will judge refreshSubscriptionTimer. IsExpiredResetOnTrue (), if the time is up, Is called kafkaManager. GetKafkaSpoutConfig (.) getSubscription () refreshAssignment () refresh the assignment
- The emitPartitionBatch method mainly finds the partition associated with the batch, stops pulling messages from other Parition, and then according to firstPollOffsetStrategy and lastBatchMeta information, Call kafkaConsumer’s seek method seek to the specified position
- PollTimeoutMs (kafkaconsumer.poll) pull data, emitTuples; The emitTuples method will use the translator to convert the data and then emit it with a call to Collector.emit
- The refreshPartitions method currently only traces the log; GetOrderedPartitions method divides the data from the map structure of allPartitionInfo deserialized back, then converted into KafkaTridentSpoutTopicPartition return; GetPartitionsForTask method mainly through kafkaConsumer. The assignment () information into KafkaTridentSpoutTopicPartition returns
summary
- – Storm-kafka-client provides KafkaTridentSpoutOpaque as a trident kafka spout(
The older version is OpaqueTridentKafkaSpout, in the storm-Kafka library
), it implements the IOpaquePartitionedTridentSpout interface - TridentTopology newStream method, for IOpaquePartitionedTridentSpout types of spout will use OpaquePartitionedTridentSpoutExecutor to packing; TridentTopologyBuilder. Will buildTopology IOpaquePartitionedTridentSpout (
OpaquePartitionedTridentSpoutExecutor
) wrapped with TridentSpoutExecutor, and then wrapped with TridentBoltExecutor as Bolt - OpaquePartitionedTridentSpoutExecutor getCoordinator returns is ITridentSpout BatchCoordinator, GetEmitter returns ICommitterTridentSpout. Emitter; They investigated KafkaTridentSpoutOpaque the original spout returns KafkaTridentSpoutOpaqueCoordinator and KafkaTridentSpoutEmitter packaging and processing; A debug log is added for coordinator, and access to EmitterPartitionState is added for Emitter
doc
- Storm Kafka Integration (0.10 x +)