
This article mainly tells about the spring for kafka’s consumer in the spring. Kafka. Consumer. Enable – auto – commit is false cases, AckMode option


Spring – kafka – 1.2.3. RELEASE – sources. The jar! /org/springframework/kafka/listener/$AckMode

     * The offset commit behavior enumeration.
    public enum AckMode {

         * Commit after each record is processed by the listener.

         * Commit whatever has already been processed before the next poll.

         * Commit pending updates after
         * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.

         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded.

         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded or after {@link ContainerProperties#setAckTime(long)
         * ackTime} has elapsed.

         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}.

         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}. The consumer is woken to
         * immediately process the commit.

    }Copy the code
  • RECORD processes a commit once
  • BATCH(The default)

    Each poll is submitted in batches, and the frequency depends on how often each poll is called
  • TIME

    Each ackTime interval to commit(How is it different from auto Commit Interval?)
  • COUNT counts up to ackCount to commit
  • Commit COUNT_TIME ackTime or ackCount if either condition is met first
  • The MANUAL Listener is responsible for the ACK, but behind it is also batch loading
  • MANUAL_IMMEDIATE Listner is responsible for ack and commit immediately after each call


Spring – kafka – 1.2.3. RELEASE – sources. The jar! /org/springframework/kafka/listener/

        public void run() {
            if (this.theListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
            this.count = 0;
            this.last = System.currentTimeMillis();
            if(isRunning() && this.definedPartitions ! = null) { initPartitionsIfNeeded(); // we start the invoker here as there will be no rebalance calls to // trigger it, but onlyif the container is not set to autocommit
                // otherwise we will process records on a separate thread
                if(! this.autoCommit) { startInvoker(); } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive;while (isRunning()) {
                try {
                    if(! this.autoCommit) { processCommits(); } processSeeks();if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Polling (paused=" + this.paused + ")...");
                    ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if(records ! = null && this.logger.isDebugEnabled()) { this.logger.debug("Received: " + records.count() + " records");
                    if(records ! = null && records.count() > 0) {if(this.containerProperties.getIdleEventInterval() ! = null) { lastReceive = System.currentTimeMillis(); } / /if the container is set to auto-commit, then execute in the
                        // same thread
                        // otherwise send to the buffering queue
                        if (this.autoCommit) {
                        else {
                            if (sendToListener(records)) {
                                if(this.assignedPartitions ! = null) { // avoid group management rebalance due to a slow // consumer this.consumer.pause(this.assignedPartitions); this.paused =true; this.unsent = records; }}}}else {
                        if(this.containerProperties.getIdleEventInterval() ! = null) { long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                                    && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                                publishIdleContainerEvent(now - lastReceive);
                                lastAlertAt = now;
                                if (this.theListener instanceof ConsumerSeekAware) {
                                    seekPartitions(getAssignedPartitions(), true);
                    this.unsent = checkPause(this.unsent);
                catch (WakeupException e) {
                    this.unsent = checkPause(this.unsent);
                catch (Exception e) {
                    if(this.containerProperties.getGenericErrorHandler() ! = null) { this.containerProperties.getGenericErrorHandler().handle(e, null); }else {
                        this.logger.error("Container exception", e); }}}if(this.listenerInvokerFuture ! = null) { stopInvoker(); commitManualAcks(); } try { this.consumer.unsubscribe(); } catch (WakeupException e) { // No-op. Continue process } this.consumer.close();if (this.logger.isInfoEnabled()) {
      "Consumer stopped"); }}Copy the code

Here, the while loop determines whether or not Auto COMMIT each time, and if not processCommits

        private void processCommits() {
            this.count += this.acks.size();
            long now;
            AckMode ackMode = this.containerProperties.getAckMode();
            if(! this.isManualImmediateAck) {if(! this.isManualAck) { updatePendingOffsets(); } boolean countExceeded = this.count >= this.containerProperties.getAckCount();if (this.isManualAck || this.isBatchAck || this.isRecordAck
                        || (ackMode.equals(AckMode.COUNT) && countExceeded)) {
                    if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
                        this.logger.debug("Committing in AckMode.COUNT because count " + this.count
                                + " exceeds configured limit of " + this.containerProperties.getAckCount());
                    this.count = 0;
                else {
                    now = System.currentTimeMillis();
                    boolean elapsed = now - this.last > this.containerProperties.getAckTime();
                    if (ackMode.equals(AckMode.TIME) && elapsed) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Committing in AckMode.TIME " +
                                    "because time elapsed exceeds configured limit of " +
                        this.last = now;
                    else if (ackMode.equals(AckMode.COUNT_TIME) && (elapsed || countExceeded)) {
                        if (this.logger.isDebugEnabled()) {
                            if (elapsed) {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because time elapsed exceeds configured limit of " +
                            else {
                                this.logger.debug("Committing in AckMode.COUNT_TIME " +
                                        "because count " + this.count + " exceeds configured limit of"+ this.containerProperties.getAckCount()); } } commitIfNecessary(); this.last = now; this.count = 0; }}}}Copy the code


        private void handleAcks() {
            ConsumerRecord<K, V> record = this.acks.poll();
            while(record ! = null) {if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + record);
                record = this.acks.poll();

        private void processAck(ConsumerRecord<K, V> record) {
            if (ListenerConsumer.this.isManualImmediateAck) {
                try {
                catch (WakeupException e) {
                    // ignore - not polling
            else{ addOffset(record); }}Copy the code

As can be seen here, if it is not the isManualImmediateAck, each time it is added to the map of offSets


        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
            for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
                    commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
                            new OffsetAndMetadata(offset.getValue() + 1));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + commits);
            if(! commits.isEmpty()) {if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing: " + commits);
                try {
                    if (this.containerProperties.isSyncCommits()) {
                    else {
                        this.consumer.commitAsync(commits, this.commitCallback);
                catch (WakeupException e) {
                    // ignore - not polling
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Woken up during commit"); }}}}Copy the code

The commits are assembled from the map of offsets, commitSync or commitAsync, and clear offsets

manual commit

    @KafkaListener(topics = "k010") public void listen(ConsumerRecord<? ,? > cr,Acknowledgment ack) throws Exception {; ack.acknowledge(); }Copy the code

This Acknowledgment should be passed in the method parameter, and then the AckMode should be configured before manual ACK

instance.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);Copy the code


