Idempotent producer

  • Idempotent is a mathematical concept that refers to an operation or function that can be performed more than once, but each time the result is the same
  • In imperative programming languages such as C, if a subroutine is idempotent, it must not modify system state. This way, no matter how many times the subroutine is run, the part of the system associated with the subroutine remains the same.
  • In functional programming languages (such as Scala or Haskell), many pure functions are inherently idempotent and do not perform any side effects.
  • Idempotent has many benefits, but the biggest advantage is that we can safely retry any idempotent operation without breaking our system state anyway

background

  • In many systems, message repetition is not allowed, such as some business settlement platforms (such as logistics platform, bank settlement platform, etc.).
  • To solve the problem of message repetition and out-of-order caused by retries, Kafka introduces idempotent messages. Idempotent messages guarantee that messages written by producer to the same partition in a session are idempotent, that is, messages cannot be repeated.
  • The idempotent nature of Kafka is simply to put in upstream Kafka the reduplication operations that would otherwise be required in downstream systems.

At least once semantics

Message delivery reliability guarantees are the promises that Kafka makes to the messages that producers and consumers will process. There are three common promises:

  1. At most once: A message may be lost, but it is never sent again.
  2. At least once: A message is not lost, but may be sent repeatedly.
  3. Exactly once: A message is never lost or sent twice.

Kafka guarantees delivery reliability at least once by default. Kafka’s producer tries again if a message fails to send. This is why Kafka provides delivery reliability at least once by default. But this can lead to duplicate messages

Kafka can also provide a maximum of one delivery by having Producer prohibit retries. In this way, the message will either write successfully or fail, but will never be sent again. Message loss is usually not desirable, but occasional message loss is allowed in some scenarios, and message duplication is absolutely avoided. At this point, it is appropriate to use at most one delivery guarantee.

At least once, or at most, is not as attractive as being precise. Most users still expect messages to be delivered only once, so that they are neither lost nor processed repeatedly. In other words, even if the Producer sends the same message repeatedly, the Broker can automatically undo the message. From the downstream Consumer’s point of view, there’s still only one message, and that’s where idempotency comes in today.

Method of use

  • A producer is not idempotent by default, but we can create idempotent producers. It’s actually a new feature introduced in version 0.11.0.0. Previously, when Kafka sent data to a partition, it was possible for the same message to be sent multiple times, resulting in duplicate messages.
  • After 0.11, specifying the idempotence of Producer is as simple as setting props. Put (” enable.idempotence “, ture), Or props. Put (producerconfig. ENABLE_IDEMPOTENCE_CONFIG, true).
  • When idempotence is set to true, the Producer is automatically upgraded to idempotent, and all other code logic does not need to change
  • Prodcuer’s idempotent reserved interface is very simple, the underlying implementation encapsulates the application layer well, and the application layer does not need to care about implementation details, making it user-friendly.

The principle of

  • Kafka automatically reloads messages for you. The underlying principle is simple. It is a classic space-for-time optimization, that is, storing more fields on the Broker side.
  • When Producer sends a message with the same field value, the Broker automatically knows that the message has been repeated and can silently “drop” it behind the scenes.

The specific implementation

Distinguish producer sessions
  • After each startup, the producer first requests a globally unique PID from the broker to identify the session. The ProducerID is not visible to the client user
  • After a restart, the PID identifying producer changes and the broker is unaware of it, so idempotency is limited to a single session
Sequence Numbe

For each PID, each <Topic,Partition> sent by the Producer corresponds to a monotonically increasing Sequence Number starting from 0. The Broker keeps this SEQ Number in the cache

For each message received, accept it if its ordinal number is greater than 1 in the Broker cache, otherwise discard it, and the message is recommitted

However, only a single Producer can be guaranteed Exactly Once semantics for the same <Topic,Partition>

Producer Retry and message detection
  • If the producer receives an EXPLICIT ACK for message loss or does not receive an ACK after a timeout, the producer tries again.
  • The sequence number will not change during retries, because the sequence number has been determined during the first send. The retries only resend
New_seq =old_seq+1: indicates a normal message. New_seq <=old_seq: duplicate message; New_seq >old_seq+1: message lost.Copy the code

Using range

  • The PID will change when the producer restarts, and different partitions have different numbers, so the idempotent of the producer cannot guarantee Exactly Once across partitions and sessions.

Single-partition features

  • Firstly, it can only guarantee idempotency on a single partition. That is, an idempotency Producer can guarantee that no repeated messages appear on one partition of a topic, and it cannot realize idempotency on multiple partitions.

  • An idempotent producer only guarantees idempotency for a single partition, whereas a producer’s messages are sent to multiple partitions of a Topic, which is why it cannot guarantee idempotency for the entire Topic.

  • And the question is, why is idempotence for single partitions

Single session feature

  • It can only achieve idempotency on a single session, not across sessions. The session here, you can think of as a run of the Producer process. This idempotent guarantee is lost when you restart the Producer process
why
  • After a restart, the PID that identifies producer changes, and the broker is unaware of it. This is another restriction on idempotency that fails to implement the idempotency of a session.

The solution

  • If you want to achieve no duplication of messages across multiple partitions and sessions, you can either do a transaction or rely on a transactional Producer.
  • This is the biggest difference between idempotent producers and transactional producers.

The source code

In fact, we also said once before, you can look at kafka client source code, look at the client source code actually can let you quickly understand kafka what things can be used, server source code can let you understand the principle, in fact, a lot of times know what kind of tools, can help you solve a lot of problems.

We’ve already covered a lot of things on the Prodcuer side, and we’ve already covered some of these classes

The name of the role
Producer This is the interface from which we inherit KafkaProducer
KafkaProducer The object we use when we send data
Partitioner Partition of interface, and we have seen it a few default implementation (DefaultPartitioner RoundRobinPartitioner, UniformStickyPartitioner), you can also inherit this interface to write a
ProducerConfig The client configuration class, which we wrote directly when creating Properties, can also use the constant producerConfig. BOOTSTRAP_SERVERS_CONFIG and “bootstrap.servers” equivalents
ProducerRecord The message object that we send
RecordMetadata An object returned after a message was sent that records meta information about the message sent
Callback The callback interface, mainly used for asynchronous sending, also provides a default implementation of ErrorLoggingCallback
ProducerInterceptor Producer interceptor

So let’s take a look at some of the main characters today, the classes that are internals

Sender

First of all, why do you want to talk about this class

This is a section of the doSend method in KafkaProducer, which is the send method that our client calls. What’s the problem? KafkaProducer’s send method doesn’t send messages directly, it appends them to the cache.

If the current cache is full or a new cache is created, the Sender(message sending thread) wakes up, sends the message from the cache to the Broker server, and finally returns the Future. It also tells you that after the doSend method completes, the message may not be successfully sent to the broker because it has not been sent yet.

So here, when the cache is full, we wake up the sender, we send the message, and we know by now what the sender is, it’s a thread public Class Sender implements Runnable, right

Producer Id

So we said PID is the key to idempotent, so let’s see how we get PID, in the Sender’s run method, right

void run(long now) {
    if(transactionManager ! =null) {
        try {
            if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                // Check if the previous run expired batches which requires a reset of the producer state.
                transactionManager.resetProducerId();
            if(! transactionManager.isTransactional()) {// This is an idempotent producer, so make sure we have a producer ID retrieve ProducerId
                maybeWaitForProducerId();
            } else if(transactionManager.hasUnresolvedSequences() && ! transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                        "some previously sent messages and can no longer retry them. It isn't safe to continue."));
            } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                // as long as there are outstanding transactional requests, we simply wait for them to return
                client.poll(retryBackoffMs, now);
                return;
            }

            // do not continue sending if the transaction manager is in a failed state or if there
            // is no producer id (for the idempotent case).
            if(transactionManager.hasFatalError() || ! transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError();if(lastError ! =null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, now);
                return;
            } else if(transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); }}catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); }}long pollTimeout = sendProducerData(now);
    client.poll(pollTimeout, now);
}
Copy the code

The run method has a maybeWaitForProducerId method that is used to get the ProducerId. This name really stands for itself, so let’s take a look at this method as well

private void maybeWaitForProducerId(a) {
    while(! forceClose && ! transactionManager.hasProducerId() && ! transactionManager.hasError()) { Node node =null;
        try {
            node = awaitLeastLoadedNodeReady(requestTimeoutMs);
            if(node ! =null) {
                ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
                InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
                Errors error = initProducerIdResponse.error();
                if (error == Errors.NONE) {
                  	// This ProducerId is obtained after a network request
                    ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                            initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                    transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
                    return;
                } else if (error.exception() instanceof RetriableException) {
                    log.debug("Retriable error from InitProducerId response", error.message());
                } else {
                    transactionManager.transitionToFatalError(error.exception());
                    break; }}else {
                log.debug("Could not find an available broker to send InitProducerIdRequest to. Will back off and retry."); }}catch (UnsupportedVersionException e) {
            transactionManager.transitionToFatalError(e);
            break;
        } catch (IOException e) {
            log.debug("Broker {} disconnected while awaiting InitProducerId response", node, e);
        }
        log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); }}Copy the code

This is executed when transactionManager does not have ProducerId()

And we see that InitProducerIdResponse uses the one we got on Response, so we think that ProducerId is actually not generated by the client, it’s actually generated by the server. The way we look at this request sendAndAwaitInitProducerIdRequest next

private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
    String nodeId = node.idString();
    InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
    ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
    return NetworkClientUtils.sendAndReceive(client, request, time);
}
Copy the code

You can see that it creates a ClientRequest from node’s information and sends it out, or more accurately, InitProducerIdRequest and then gets the ProducerId in the returned response, if you’re interested, You can also see how the server handles this request. Here is the server code, written in Scala. Okay

def handleInitProducerIdRequest(request: RequestChannel.Request) :Unit = {
  val initProducerIdRequest = request.body[InitProducerIdRequest]
  val transactionalId = initProducerIdRequest.transactionalId

  if(transactionalId ! =null) {
    // Check permissions
    if(! authorize(request.session,Write.new Resource(TransactionalId, transactionalId))) {
      sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
      return}}else if(! authorize(request.session,IdempotentWrite.Resource.ClusterResource)) {
    sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
    return
  }

  def sendResponseCallback(result: InitProducerIdResult) :Unit = {
    def createResponse(requestThrottleMs: Int) :AbstractResponse = {
      val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
      trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
      responseBody
    }
    sendResponseMaybeThrottle(request, createResponse)
  }
  // The permission verification passes the corresponding PID, which is returned to the producer
  txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
}
Copy the code

Here we see that it was eventually generated by a Coordinator

You can see that when Server initializes a PID for a client, it actually generates a PID through the generateProducerId() method of ProducerIdManager.

ZK is actually the next step, if you are interested can continue to walk down

def generateProducerId() :Long = {
  this synchronized {
    // grab a new block of producerIds if this block has been exhausted
    if (nextProducerId > currentProducerIdBlock.blockEndId) {
      getNewProducerIdBlock()
      nextProducerId = currentProducerIdBlock.blockStartId + 1
    } else {
      nextProducerId += 1
    }

    nextProducerId - 1}}Copy the code

ProducerBatch

Why is there a class that suddenly appears? We mentioned that KafkaProducer’s send method actually adds messages to the cache, it doesn’t actually send, and we know that sending is done inside the Sender’s Run method, We also get our Producer Id from the Run method

We’ve split the contents of the Sender’s run method into two pieces, the first one does some initialization, the second one sends the data, and we’re going to look at the implementation of this method, and we’re going to make some cuts

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();

        // Get a set of partitions ready to accept data
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if(! result.unknownLeaderTopics.isEmpty()) {// The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // Remove the nodes that cannot receive messages from the collection
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); }}// Create ProducerBatch to send data
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        sendProduceRequests(batches, now);

        return pollTimeout;
    }
Copy the code

Accumulator is the place where we cache the message. Here we can see that the data is finally encapsulated into ProducerBatch and sent. “Sequence numbe” is not an accumulator drain method. This code is a bit long and is not an accumulator drain method. We just take the parts

This code first obtains the ProducerIdAndEpoch class object, which encapsulates the ProducerId, and then we focus on the sequence number. We see that there is a judgment here. Batch.hassequence () specifies that the Sequence cannot be changed once generated.

To read the key place batch. The next setProducerState (producerIdAndEpoch, transactionManager sequenceNumber (batch. TopicPartition), isTransactional); We know that the Batch object should have an attribute like sequence Number, so we assume that this attribute is assigned in the batch.setProducerState method.

Note the second parameter transactionManager. SequenceNumber (batch. TopicPartition), according to capturing the Partition sequenceNumber, we can also take a look at this code

/** * Returns the next sequence number to be written to the given TopicPartition. */
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
    Integer currentSequenceNumber = nextSequence.get(topicPartition);
    if (currentSequenceNumber == null) {
        currentSequenceNumber = 0;
        nextSequence.put(topicPartition, currentSequenceNumber);
    }
    return currentSequenceNumber;
}
Copy the code

This is where we’re done with our fetch. If you look at the screenshot below, the fetch will call the increment operation to maintain the increment feature of SequenceNumber

Finally, we’ll look at setProducerState. After we see setProducerState, our batch data has a property baseSequence, which can be used to determine the server side.

public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
    if (isClosed()) {
        // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
        // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
        // be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence
        // once a batch has been sent to the broker risks introducing duplicates.
        throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
    }
    this.producerId = producerId;
    this.producerEpoch = producerEpoch;
    this.baseSequence = baseSequence;
    this.isTransactional = isTransactional;
}
Copy the code
sequence number

After having PID, add a sequence numbers information to the PID + topic-partition level to realize the idempotent of Producer.

ProducerBatch also provides a setProducerState() method, which adds some meta information (PID, baseSequence, isTransactional) to a batch, This information will be sent to the Server along with ProduceRequest, and the Server will use these meta to make corresponding judgments. Next, let’s look at the Server side processing. In this class (Scala), we find handleProduceRequest, the method that handles the request, and then we look at the implementation of this code. We’ve only captured some of it here

As you can see, the first part is mainly permission verification, and the second part is traversal processing data

Why does idempotent not support cross-session and multi-partition

Across sessions

The reason why cross-session is not supported is that the PID identifying producer changes after a restart. As a result, the broker is unable to determine whether it is duplicated based on the <PID,TP,SEQNUM> condition.

Across partitions

If you send the sequence number to another partition, the sequence number will not exist. If you send the sequence to another partition, the sequence number will not exist.

thinking

  1. Does Retry guarantee that it will be sent to the same partition?
  2. In what case does the idempotent of a single partition guarantee global idempotent

conclusion

  1. Mainly introduced what is idempotent and its realization principle
  2. From the source code level, the obtaining of Producer Id and Sequence Number and their working principle are analyzed
  3. Sequence Number It is not bound to a single message, but to a group of messages.
  4. Idempotent disadvantages do not support cross-call and cross-partition, the advantages are also obvious, easy to use