sequence

This article mainly tells about the spring for kafka’s consumer in the spring. Kafka. Consumer. Enable – auto – commit is false cases, AckMode option

AckMode

Spring – kafka – 1.2.3. RELEASE – sources. The jar! /org/springframework/kafka/listener/AbstractMessageListenerContainer.java$AckMode

    /**
     * The offset commit behavior enumeration.
     */
    public enum AckMode {

        /**
         * Commit after each record is processed by the listener.
         */
        RECORD,

        /**
         * Commit whatever has already been processed before the next poll.
         */
        BATCH,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
         */
        TIME,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded.
         */
        COUNT,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded or after {@link ContainerProperties#setAckTime(long)
         * ackTime} has elapsed.
         */
        COUNT_TIME,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}.
         */
        MANUAL,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}. The consumer is woken to
         * immediately process the commit.
         */
        MANUAL_IMMEDIATE,

    }Copy the code
  • RECORD processes a commit once
  • BATCH(The default)

    Each poll is submitted in batches, and the frequency depends on how often each poll is called
  • TIME

    Each ackTime interval to commit(How is it different from auto Commit Interval?)
  • COUNT counts up to ackCount to commit
  • Commit COUNT_TIME ackTime or ackCount if either condition is met first
  • The MANUAL Listener is responsible for the ACK, but behind it is also batch loading
  • MANUAL_IMMEDIATE Listner is responsible for ack and commit immediately after each call

KafkaMessageListenerContainer$ListenerConsumer

Spring – kafka – 1.2.3. RELEASE – sources. The jar! /org/springframework/kafka/listener/KafkaMessageListenerContainer.java

        @Override
        public void run() {
            if (this.theListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if(isRunning() && this.definedPartitions ! = null) { initPartitionsIfNeeded(); // we start the invoker here as there will be no rebalance calls to // trigger it, but onlyif the container is not set to autocommit
                // otherwise we will process records on a separate thread
                if(! this.autoCommit) { startInvoker(); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive;while (isRunning()) {
                try {
                    if(! this.autoCommit) { processCommits(); } processSeeks();if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Polling (paused=" + this.paused + ")...");
                    }
                    ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if(records ! = null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records");
                    }
                    if(records ! = null && records.count() > 0) {if(this.containerProperties.getIdleEventInterval() ! = null) { lastReceive = System.currentTimeMillis(); } / /if the container is set to auto-commit, then execute in the
                        // same thread
                        // otherwise send to the buffering queue
                        if (this.autoCommit) {
                            invokeListener(records);
                        }
                        else {
                            if (sendToListener(records)) {
                                if(this.assignedPartitions ! = null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused =true; this.unsent = records; }}}}else {
                        if(this.containerProperties.getIdleEventInterval() ! = null) { long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                                    && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                                publishIdleContainerEvent(now - lastReceive);
                                lastAlertAt = now;
                                if (this.theListener instanceof ConsumerSeekAware) {
                                    seekPartitions(getAssignedPartitions(), true);
                                }
                            }
                        }
                    }
                    this.unsent = checkPause(this.unsent);
                }
                catch (WakeupException e) {
                    this.unsent = checkPause(this.unsent);
                }
                catch (Exception e) {
                    if(this.containerProperties.getGenericErrorHandler() ! = null) { this.containerProperties.getGenericErrorHandler().handle(e, null); }else {
                        this.logger.error("Container exception", e); }}}if(this.listenerInvokerFuture ! = null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close();if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped"); }}Copy the code

Here, the while loop determines whether or not Auto COMMIT each time, and if not processCommits

        private void processCommits() {
            handleAcks();
            this.count += this.acks.size();
            long now;
            AckMode ackMode = this.containerProperties.getAckMode();
            if(! this.isManualImmediateAck) {if(! this.isManualAck) { updatePendingOffsets(); } boolean countExceeded = this.count >= this.containerProperties.getAckCount();if (this.isManualAck || this.isBatchAck || this.isRecordAck
                        || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
                    if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
                        this.logger.debug("Committing in AckMode.COUNT because count " + this.count
                                + " exceeds configured limit of " + this.containerProperties.getAckCount());
                    }
                    commitIfNecessary();
                    this.count = 0;
                }
                else {
                    now = System.currentTimeMillis();
                    boolean elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals(AckMode.TIME) && elapsed) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Committing in AckMode.TIME " +
                                    "because time elapsed exceeds configured limit of " +
                                    this.containerProperties.getAckTime());
                        }
                        commitIfNecessary();
                        this.last = now;
                    }
                    else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
                        if (this.logger.isDebugEnabled()) {
                            if (elapsed) {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because time elapsed exceeds configured limit of " +
                                        this.containerProperties.getAckTime());
                            }
                            else {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because count " + this.count + " exceeds configured limit of"+ this.containerProperties.getAckCount()); } } commitIfNecessary(); this.last = now; this.count = 0; }}}}Copy the code

handleAcks

        private void handleAcks() {
            ConsumerRecord<K, V> record = this.acks.poll();
            while(record ! = null) {if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + record);
                }
                processAck(record);
                record = this.acks.poll();
            }
        }

        private void processAck(ConsumerRecord<K, V> record) {
            if (ListenerConsumer.this.isManualImmediateAck) {
                try {
                    ackImmediate(record);
                }
                catch (WakeupException e) {
                    // ignore - not polling
                }
            }
            else{ addOffset(record); }}Copy the code

As can be seen here, if it is not the isManualImmediateAck, each time it is added to the map of offSets

commitIfNecessary

        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
            for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                    commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
                            new OffsetAndMetadata(offset.getValue() + 1));
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + commits);
            }
            if(! commits.isEmpty()) {if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing: " + commits);
                }
                try {
                    if (this.containerProperties.isSyncCommits()) {
                        this.consumer.commitSync(commits);
                    }
                    else {
                        this.consumer.commitAsync(commits, this.commitCallback);
                    }
                }
                catch (WakeupException e) {
                    // ignore - not polling
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Woken up during commit"); }}}}Copy the code

The commits are assembled from the map of offsets, commitSync or commitAsync, and clear offsets

manual commit

    @KafkaListener(topics = "k010") public void listen(ConsumerRecord<? ,? > cr,Acknowledgment ack) throws Exception { LOGGER.info(cr.toString()); ack.acknowledge(); }Copy the code

This Acknowledgment should be passed in the method parameter, and then the AckMode should be configured before manual ACK

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);Copy the code

doc

  • spring-kafka-committing-offsets