sequence

In this paper, we study the storm OpaquePartitionedTridentSpoutExecutor

TridentTopology.newStream

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/TridentTopology.java

    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }
Copy the code
  • TridentTopology newStream method, for IOpaquePartitionedTridentSpout types of spout will use OpaquePartitionedTridentSpoutExecutor to packing; While KafkaTridentSpoutOpaque IOpaquePartitionedTridentSpout interface is realized

TridentTopologyBuilder.buildTopology

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/topology/TridentTopologyBuilder.java

    public StormTopology buildTopology(Map<String, Number> masterCoordResources) {
        TopologyBuilder builder = new TopologyBuilder();
        Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

        Map<String, List<String>> batchesToCommitIds = new HashMap<>();
        Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<>();
        
        for(String id: _spouts.keySet()) {
            TransactionalSpoutComponent c = _spouts.get(id);
            if(c.spout instanceof IRichSpout) {
                
                //TODO: wrap this to set the stream name
                builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
            } else {
                String batchGroup = c.batchGroupId;
                if(! batchesToCommitIds.containsKey(batchGroup)) { batchesToCommitIds.put(batchGroup, new ArrayList<String>()); } batchesToCommitIds.get(batchGroup).add(c.commitStateId);if(! batchesToSpouts.containsKey(batchGroup)) { batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>()); } batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); BoltDeclarer scd = builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);for(Map<String, Object> m: c.componentConfs) {
                    scd.addConfigurations(m);
                }
                
                Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
                specs.put(c.batchGroupId, new CoordSpec());
                BoltDeclarer bd = builder.setBolt(id,
                        new TridentBoltExecutor(
                          new TridentSpoutExecutor(
                            c.commitStateId,
                            c.streamName,
                            ((ITridentSpout) c.spout)),
                            batchIdsForSpouts,
                            specs),
                        c.parallelism);
                bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
                bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if(c.spout instanceof ICommitterTridentSpout) {
                    bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                for(Map<String, Object> m: c.componentConfs) { bd.addConfigurations(m); }}} / /...return builder.createTopology();
    }
Copy the code
  • TridentTopologyBuilder. Will buildTopology IOpaquePartitionedTridentSpout (OpaquePartitionedTridentSpoutExecutor) wrapped with TridentSpoutExecutor, and then wrapped with TridentBoltExecutor as Bolt

OpaquePartitionedTridentSpoutExecutor

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> { protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class); IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout; / /... public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) { _spout = spout; } @Override public ITridentSpout.BatchCoordinator<Object> getCoordinator(String txStateId, Map conf, TopologyContext context) {return new Coordinator(conf, context);
    }

    @Override
    public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }

    @Override
    public Fields getOutputFields() {
        return _spout.getOutputFields();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return_spout.getComponentConfiguration(); }}Copy the code
  • OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout getCoordinator return here is ITridentSpout BatchCoordinator, GetEmitter returns ICommitterTridentSpout. Emitter

ITridentSpout.BatchCoordinator

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

    public class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
        IOpaquePartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map conf, TopologyContext context) {
            _coordinator = _spout.getCoordinator(conf, context);
        }
        
        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata);
            return _coordinator.getPartitionsForBatch();
        }


        @Override
        public void close() {
            LOG.debug("Closing");
            _coordinator.close();
            LOG.debug("Closed");
        }

        @Override
        public void success(long txid) {
            LOG.debug("Success [txid = {}]", txid);
        }

        @Override
        public boolean isReady(long txid) {
            boolean ready = _coordinator.isReady(txid);
            LOG.debug("[isReady = {}], [txid = {}]", ready, txid);
            returnready; }}Copy the code
  • Packing the spout _coordinator, its type IOpaquePartitionedTridentSpout. The Coordinator, much here is just the debug logs

ICommitterTridentSpout.Emitter

Storm – core – 1.2.2 – sources jar! /org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

   public class Emitter implements ICommitterTridentSpout.Emitter {        
        IOpaquePartitionedTridentSpout.Emitter<Object, ISpoutPartition, Object> _emitter;
        TransactionalState _state;
        TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<>();
        Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
        int _index;
        int _numTasks;

        public Emitter(String txStateId, Map conf, TopologyContext context) {
            _emitter = _spout.getEmitter(conf, context);
            _index = context.getThisTaskIndex();
            _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
            _state = TransactionalState.newUserState(conf, txStateId);
            LOG.debug("Created {}", this);
        }

        Object _savedCoordinatorMeta = null;
        boolean _changedMeta = false;

        @Override
        public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
            LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);

            if(_savedCoordinatorMeta==null || ! _savedCoordinatorMeta.equals(coordinatorMeta)) { _partitionStates.clear(); final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);for (ISpoutPartition partition : taskPartitions) {
                    _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
                }

                // refresh all partitions for backwards compatibility with old spout
                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                _savedCoordinatorMeta = coordinatorMeta;
                _changedMeta = true;
            }
            Map<String, Object> metas = new HashMap<>();
            _cachedMetas.put(tx.getTransactionId(), metas);

            Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
            Map<String, Object> prevCached;
            if(entry! =null) { prevCached = entry.getValue(); }else {
                prevCached = new HashMap<>();
            }
            
            for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
                String id = e.getKey();
                EmitterPartitionState s = e.getValue();
                s.rotatingState.removeState(tx.getTransactionId());
                Object lastMeta = prevCached.get(id);
                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
                Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                metas.put(id, meta);
            }
            LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]",
                    tx, coordinatorMeta, collector, this);
        }

        @Override
        public void success(TransactionAttempt tx) {
            for(EmitterPartitionState state: _partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
            LOG.debug("Success transaction {}. [{}]", tx, this);
        }

        @Override
        public void commit(TransactionAttempt attempt) {
            LOG.debug("Committing transaction {}. [{}]", attempt, this);
            // this code here handles a case where a previous commit failed, and the partitions
            // changed since the last commit. This clears out any state for the removed partitions
            // for this txid.
            // we make sure only a single task ever does this. we're also guaranteed that // it's impossible for there to be another writer to the directory for that partition
            // because only a single commit can be happening at once. this is because in order for 
            // another attempt of the batch to commit, the batch phase must have succeeded in between.
            // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
            if(_changedMeta && _index==0) {
                Set<String> validIds = new HashSet<>();
                for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
                    validIds.add(p.getId());
                }
                for(String existingPartition: _state.list("")) {
                    if(! validIds.contains(existingPartition)) { RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition); s.removeState(attempt.getTransactionId()); } } _changedMeta =false;
            }
            
            Long txid = attempt.getTransactionId();
            Map<String, Object> metas = _cachedMetas.remove(txid);
            for(Entry<String, Object> entry: metas.entrySet()) {
                _partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
            }
            LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this);
        }

        @Override
        public void close() {
            LOG.debug("Closing");
            _emitter.close();
            LOG.debug("Closed");
        }

        @Override
        public String toString() {
            return "Emitter{" +
                    ", _state=" + _state +
                    ", _cachedMetas=" + _cachedMetas +
                    ", _partitionStates=" + _partitionStates +
                    ", _index=" + _index +
                    ", _numTasks=" + _numTasks +
                    ", _savedCoordinatorMeta=" + _savedCoordinatorMeta +
                    ", _changedMeta=" + _changedMeta +
                    '} '; } } static class EmitterPartitionState { public RotatingTransactionalState rotatingState; public ISpoutPartition partition; public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) { rotatingState = s; partition = p; }}Copy the code
  • Here to spout IOpaquePartitionedTridentSpout. The Emitter encapsulation, _partitionStates EmitterPartitionState was used
  • Calculation _partitionStates emitBatch method first, and then calculate prevCached, finally call _emitter. EmitPartitionBatch (tx, the collector, supachai panitchpakdi artition, lastMeta)
  • Success method call state. RotatingState. CleanupBefore (tx) getTransactionId ()), empty before the txid state information; The commit method basically updates _partitionStates

KafkaTridentSpoutOpaque

Storm – kafka – the client – 1.2.2 – sources. The jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java

public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
    private static final long serialVersionUID = -8003272486566259640L;

    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);

    private final KafkaTridentSpoutManager<K, V> kafkaManager;

    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
        this(new KafkaTridentSpoutManager<>(conf));
    }
    
    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }

    @Override
    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
            Map conf, TopologyContext context) {
        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
    }

    @Override
    public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        final Fields outputFields = kafkaManager.getFields();
        LOG.debug("OutputFields = {}", outputFields);
        return outputFields;
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager + '} '; }}Copy the code
  • KafkaTridentSpoutOpaque getCoordinator returns KafkaTridentSpoutOpaqueCoordinator; GetEmitter returns KafkaTridentSpoutEmitter

KafkaTridentSpoutOpaqueCoordinator

Storm – kafka – the client – 1.2.2 – sources. The jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java

public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
        Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);

    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
    private final KafkaTridentSpoutManager<K,V> kafkaManager;

    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
        this.kafkaManager = kafkaManager;
        LOG.debug("Created {}", this.toString());
    }

    @Override
    public boolean isReady(long txid) {
        LOG.debug("isReady = true");
        return true;    // the "old" trident kafka spout always returns true, like this
    }

    @Override
    public List<Map<String, Object>> getPartitionsForBatch() {
        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
        LOG.debug("TopicPartitions for batch {}", topicPartitions);
        List<Map<String, Object>> tps = new ArrayList<>();
        for(TopicPartition tp : topicPartitions) {
            tps.add(tpSerializer.toMap(tp));
        }
        return tps;
    }

    @Override
    public void close() {
        LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '} '; }}Copy the code
  • IsReady here always returns true, getPartitionsForBatch method mainly kafkaManager. GetTopicPartitions () structure of information into the map

KafkaTridentSpoutEmitter

Storm – kafka – the client – 1.2.2 – sources. The jar! /org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java

public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
        List<Map<String, Object>>,
        KafkaTridentSpoutTopicPartition,
        Map<String, Object>>,
        Serializable {

    private static final long serialVersionUID = -7343927794834130435L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);

    // Kafka
    private final KafkaConsumer<K, V> kafkaConsumer;

    // Bookkeeping
    private final KafkaTridentSpoutManager<K, V> kafkaManager;
    // set of topic-partitions for which first poll has already occurred, and the first polled txid
    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 

    // Declare some KafkaTridentSpoutManager references forconvenience private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; private final RecordTranslator<K, V> translator; private final Timer refreshSubscriptionTimer; private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); private TopologyContext topologyContext; /** * Create a new Kafka spout emitter. * @param kafkaManager The Kafka consumer manager to use * @param topologyContext  The topology context * @param refreshSubscriptionTimer The timerfor deciding when to recheck the subscription
     */
    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
        this.kafkaManager = kafkaManager;
        this.topologyContext = topologyContext;
        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();

        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
        LOG.debug("Created {}", this.toString()); } /** * Creates instance of this class with default 500 millisecond refresh subscription timer */ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) { this(kafkaManager, topologyContext, new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); } / /... @Override public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) { LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
                tx, currBatchPartition, lastBatch, collector);

        final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
        Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

        if(assignments == null || ! assignments.contains(currBatchPartition.getTopicPartition())) { LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                            "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
                            "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
                    kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
        } else {
            try {
                // pause other topic-partitions to only poll from current topic-partition
                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);

                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());

                // poll
                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
                }

                final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
                LOG.debug("Polled [{}] records from Kafka.", records.count());

                if(! records.isEmpty()) { emitTuples(collector, records); // build new metadata currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records); } } finally { kafkaConsumer.resume(pausedTopicPartitions); LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
            }
            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
                    "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
        }

        return currentBatch == null ? null : currentBatch.toMap();
    }

    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
        for (ConsumerRecord<K, V> record : records) {
            final List<Object> tuple = translator.apply(record);
            collector.emit(tuple);
            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
        }
    }

    @Override
    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
                "No action taken by this method for topic partitions {}", partitionResponsibilities);
    }

    /**
     * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
     * for this task must be assigned to the Kafka consumer running on this task.
     *
     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
     * @return ordered list of topic partitions for this task
     */
    @Override
    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
        List<TopicPartition> allTopicPartitions = new ArrayList<>();
        for(Map<String, Object> map : allPartitionInfo) {
            allTopicPartitions.add(tpSerializer.fromMap(map));
        }
        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
                allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
        return allPartitions;
    }

    @Override
    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
        List<Map<String, Object>> allPartitionInfo) {
        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
        final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
        return taskTps;
    }

    @Override
    public void close() {
        kafkaConsumer.close();
        LOG.debug("Closed");
    }

    @Override
    public final String toString() {
        return super.toString() +
                "{kafkaManager=" + kafkaManager +
                '} '; }}Copy the code
  • RefreshSubscriptionTimer interval take here is kafkaManager getKafkaSpoutConfig () getPartitionRefreshPeriodMs (), the default is 2000
  • EmitPartitionBatch method calls once will judge refreshSubscriptionTimer. IsExpiredResetOnTrue (), if the time is up, Is called kafkaManager. GetKafkaSpoutConfig (.) getSubscription () refreshAssignment () refresh the assignment
  • The emitPartitionBatch method mainly finds the partition associated with the batch, stops pulling messages from other Parition, and then according to firstPollOffsetStrategy and lastBatchMeta information, Call kafkaConsumer’s seek method seek to the specified position
  • PollTimeoutMs (kafkaconsumer.poll) pull data, emitTuples; The emitTuples method will use the translator to convert the data and then emit it with a call to Collector.emit
  • The refreshPartitions method currently only traces the log; GetOrderedPartitions method divides the data from the map structure of allPartitionInfo deserialized back, then converted into KafkaTridentSpoutTopicPartition return; GetPartitionsForTask method mainly through kafkaConsumer. The assignment () information into KafkaTridentSpoutTopicPartition returns

summary

  • – Storm-kafka-client provides KafkaTridentSpoutOpaque as a trident kafka spout(The older version is OpaqueTridentKafkaSpout, in the storm-Kafka library), it implements the IOpaquePartitionedTridentSpout interface
  • TridentTopology newStream method, for IOpaquePartitionedTridentSpout types of spout will use OpaquePartitionedTridentSpoutExecutor to packing; TridentTopologyBuilder. Will buildTopology IOpaquePartitionedTridentSpout (OpaquePartitionedTridentSpoutExecutor) wrapped with TridentSpoutExecutor, and then wrapped with TridentBoltExecutor as Bolt
  • OpaquePartitionedTridentSpoutExecutor getCoordinator returns is ITridentSpout BatchCoordinator, GetEmitter returns ICommitterTridentSpout. Emitter; They investigated KafkaTridentSpoutOpaque the original spout returns KafkaTridentSpoutOpaqueCoordinator and KafkaTridentSpoutEmitter packaging and processing; A debug log is added for coordinator, and access to EmitterPartitionState is added for Emitter

doc

  • Storm Kafka Integration (0.10 x +)