preface

Key words brief introduction

  • Broker: Represents a physical node of a Kafka service. This version registers its ID with ZooKeeper to achieve high availability. There is only one Broker in a standalone environment
  • Topic: Message Topic, which consumers subscribe to to get messages
  • Partition: Message Partition. A topic can have multiple partitions. Messages are sent to the Partition in a certain format

The source code

Sending messages using producers involves the following classes:

  • KafkaProducer: Producer master class, used to coordinate other resources
  • RecordAccumulator: A message accumulator. When sending a message, the message is submitted to the accumulator. The accumulator stores the message according to Topic and Partition
  • Sender: Gets the message from RecordAccumulator and constructs the request to submit and send
  • NetworkClient: mainly calls KafkaChannel to send and read messages, and processes successful messages sent and read. It is the core of the event cycle

RecordAccumulator Message accumulator

Why is RecordAccumulator an accumulator? Because RecordAccumulator is used to store a batch of messages, which involves some processing methods of messages. Therefore, it is not associated with other classes, so it can analyze the source code separately

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
Copy the code

The main structure, which uses a Deque to store ProducerBatch

    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // Get or create a deque to store records
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if(appendResult ! =null)
                    return appendResult;
            }

            // Returns if no batch records are set to true
            if (abortOnNewBatch) {
                return new RecordAppendResult(null.false.false.true);
            }
            
Copy the code

In the first half of the append method, the Deque is first obtained based on the subject and partition information. If it does not exist, the Deque is created, and tryAppend is called to append the Deque.

Note the abortOnNewBatch parameter, which indicates that if tryAppend fails and is set to true, the current append operation is interrupted, how can tryAppend fail?

 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        ProducerBatch last = deque.peekLast();
        if(last ! =null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false.false);
        }
        return null;
    }
Copy the code

The tryAppend method retrieves the most recent ProducerBatch from the Deque and returns null indicating failure. That is, if the first message is sent, the append will fail if the previous batch of fetched messages are sent there

byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                / / try again
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if(appendResult ! =null) {
                    return appendResult;
                }

                // Create a batch record
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true.false); }}finally {
            if(buffer ! =null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
Copy the code

If abortOnNewBatch is set to false, it enters the process of creating ProducerBatch and then append the message, and adding Batch to the deque

AppendsInProgress is used to indicate whether the current append operation is running ++

The accumulator’s append operation simply adds messages to ProducerBatch, which forms a Deque chain

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            Deque<ProducerBatch> deque = entry.getValue();
            synchronized (deque) {
                ProducerBatch batch = deque.peekFirst();
                if(batch ! =null) {
                    TopicPartition part = entry.getKey();
                    Node leader = cluster.leaderFor(part);
                    if (leader == null) {
                        // Join the unknown leader set
                        unknownLeaderTopics.add(part.topic());
                    } else if(! readyNodes.contains(leader) && ! isMuted(part, nowMs)) {long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        The accumulator is out of memory, and the thread is blocking waiting for data to have been closed
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if(sendable && ! backingOff) { readyNodes.add(leader); }else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            // Next check time
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }
Copy the code

If yes, add the nodes that can send data to readyNodes as follows:

  • Iterate through each topic partition information to find the leader node for that partition, because only the Leader node is used to process send requests
  • Data can be sent when any of the following conditions occur
    • Batch is full of
    • There is enough time to wait for the news
    • The accumulator is out of memory
    • The accumulator has been turned off
  • Returns node information that can be sent, next check time, and unknown leader node status

The ready method is used to determine whether stored messages can be sent

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
	    // Filter the node
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
            batches.put(node.id(), ready);
        }
        return batches;
    }
Copy the code

Since partitions are distributed on different nodes, the drain method separates messages sent to the same node

The drainBatchesForOneNode logic is: Get the current node partition information, check the Map Deque, and return the prepared ProducerBatch according to some criteria such as whether the message exceeds maxSize

   void abortBatches(final RuntimeException reason) {
        for (ProducerBatch batch : incomplete.copyAll()) {
            Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
            synchronized(dq) { batch.abortRecordAppends(); dq.remove(batch); } batch.abort(reason); deallocate(batch); }}Copy the code

Messages that have already been submitted to the accumulator can be broken, deleting the batch and triggering the onCompletion callback function

Summary of accumulator: Accumulator is the staging point for sending messages. The submitted messages will be stored as the Key of Map according to “Theme-partition”. Ready determines whether the message should be sent. If the message can be sent, call the drain method to sort it by the node from which it was sent. If something goes wrong, can the message be interrupted

Partitioner

public interface Partitioner extends Configurable.Closeable {

  
    // Compute partitions
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

   
    public void close(a);

    // Start a new Batch
    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}}Copy the code

The Partitioner is a partition computation interface that determines which partition a message is sent to. This interface can be implemented to customize the message partitioning rules or to use the default rules.

RoundRobinPartitioner rounds partitions

 private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
        return counter.getAndIncrement();
    }


public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // Partition number + 1
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if(! availablePartitions.isEmpty()) {// Find the method of residuals in the availability zone
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // There are no available partitions
            returnUtils.toPositive(nextValue) % numPartitions; }}Copy the code

Round robin implementation is very simple, using the way of redundancy

DefaultPartitioner DefaultPartitioner

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    
 
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
Copy the code

If the keyBytes (serialized key) that you want to partition are not empty, use hash to partition. Instead, use StickyPartitionCache

++StickyPartitionCache tries to send messages to the same partition, let the partition’s accumulator fill up and send them, and then switch partitions. This reduces the hash time when sending messages at high speeds, and the partition messages become balanced as the system continues to run

Summary: This is the system’s built-in partitioning policy, if not specified manually use DefaultPartitioner to hash the partition

Sender Message sending thread

Sender is used to process the messages inside RecordAccumulator to send

public void run(a) {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e); }}...Copy the code

Sender is a Runnable, and inside the run method is a while loop that always executes the runOnce method

void runOnce(a) {
// The previous section is about transaction managers.if{maybeAbortBatches(lastError); }...long currentTimeMs = time.milliseconds();
        long pollTimeout = sendProducerData(currentTimeMs);
        client.poll(pollTimeout, currentTimeMs);
}
Copy the code

This is a batch of pseudo-code, the first half of which is transaction specific, and if there is a problem, it calls abortBatches. The sendProducerData method really handles the sent message

The SendProducerData method is as follows:

private long sendProducerData(long now) {
        // The read cache is asynchronous
        Cluster cluster = metadata.fetch();
        // Get the ready node
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // If there are any partitions whose leaders are not aware of, force metadata updates
        if(! result.unknownLeaderTopics.isEmpty()) {for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }
Copy the code

Call accumulator. Ready and get the connection. Metadata is then updated for the partition that cannot find the leader

Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); }}Copy the code

The second step is to test the connectivity of the prepared nodes and delete those that do not meet the requirements

Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition); }}Copy the code

The third step is to use the drain method to allocate messages to nodes. If the partition ordering is enabled, mute the partition to ensure that only one batch of messages can be sent to the current partition

Step 4 Process expired messages

Ests of sendProduceRequests(Batches, NOW)

private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
    }

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
  for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();
	ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        // The callback functionRequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds()); String nodeId = Integer.toString(destination); ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks ! =0, requestTimeoutMs, callback); client.send(clientRequest, now); }}Copy the code

Call NetworkClient’s send method, and finally call kafkaChannel’s setSend method for caching

NetworkClient

The last line of the Sender runOnce looks like this:

client.poll(pollTimeout, currentTimeMs);
Copy the code

We call NetworkClient’s poll method, and we know that Selector also has a poll method, which is used to select I/O events, perform read and write events and put things in a Set, and NetworkClient. Poll is supposed to handle those events

public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if(! abortedSends.isEmpty()) {// If delivery is aborted due to unsupported version exceptions or disconnections, handle them immediately without waiting for select.poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }

        // Calculate the minimum time required for metadata update
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        // Process the message that has been sent successfully
        handleCompletedSends(responses, updatedNow);
        // Process the message that has been successfully read
        handleCompletedReceives(responses, updatedNow);
        // Handle disconnected nodes
        handleDisconnections(responses, updatedNow);
        // Handle connections
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        // Call the callback method to send a message to topic
        completeResponses(responses);

        return responses;
    }
Copy the code

As you can see from the annotations, the node data is first retrieved, and then the selector. Poll method is executed to handle the read and write events, which are then processed. Since the producer client is primarily responsible for sending, the logic for handling read data is not too complicated

@Override
        public long maybeUpdate(long now) {
            // The next metadata update time is set to needUpdate immediately or calculate the metadata expiration time)
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            // If the last request to the server has not been received, set defaultRequestTimeoutMs to 30S
            long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;

            long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);

            // Return the waiting time because the new time has not arrived
            if (metadataTimeout > 0) {
                return metadataTimeout;
            }

            // Select an idle node for fetching
            Node node = leastLoadedNode(now);
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                return reconnectBackoffMs;
            }

            // Send a request
            return maybeUpdate(now, node);
        }
Copy the code

The first is to request metadata from the server, topics, partitions, nodes, etc. Select an idle node to send the request using the leastLoadedNode method

The method eventually sends the request and calculates the wait time as the selector. Poll parameter

private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // if no response is expected then when the send is completed, return it
        for (Send send : this.selector.completedSends()) {
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
            if(! request.expectResponse) {// Remove messages from inFlightRequests
                this.inFlightRequests.completeLastSent(send.destination());
                responses.add(request.completed(null, now)); }}}Copy the code

Process completed requests and remove them from inFlightRequests

private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e); }}}Copy the code

Call the onComplete callback. For messages, this function is handleProduceResponse, as reflected in the sendProduceRequests method (see above).

OnComplete is used primarily to validate the Batch message, and ultimately to call onCompletion, which we wrote ourselves to indicate that the message has been sent to the corresponding node.

KafkaProducer

@Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
Copy the code

We submit the message using the send method, which triggers the OnSend method and then doSend

// Wait for metadata to update
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
// serialize key-val
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// Compute partitions
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
Copy the code

After simplifying some of the logic, the first half is divided into the above steps, and you can see that the serializer we created and the partition (not specified to use the default) are called here.

 private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        Cluster cluster = metadata.fetch();
        metadata.add(topic);

        // Get the number of topic partitions
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        Return if no partition is specified but partitions exist or partitions are in a known range
        if(partitionsCount ! =null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);
	 int version = metadata.requestUpdate();
            // Wake up the sender to send metadata to the remote server to send the request
            sender.wakeup();
	metadata.awaitUpdate(version, remainingWaitMs);	

Copy the code

The NetworkClient. Poll method will send the metadata request, and wait some time for another thread to fetch the new metadata

 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true);
            // If there is no Batch in the Deque, this step is entered to indicate that the last Batch of messages has been sent or created for the first time
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                // The StickyPartitionCache logic needs to be replaced
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                // producer callback will make sure to call both 'callback' and interceptor callback
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

                result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, false);
            }
            
            if(transactionManager ! =null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
Copy the code

Then call accumulator.append method, if the first call would be unsuccessful, did not create Batch. This is compatible with StickyPartition logic, which re-selects partitions and creates batch messages

KafkaProducer ends there and the Sender thread is left to do the work

conclusion

This article introduces the message sending process. Since we call Send, the message is sent to the accumulator. The Sender thread keeps working, taking the message from the accumulator and sending it to KafkaChannel, triggering Selector.