This is the ninth day of my participation in the Gwen Challenge.More article challenges

kafka producer accumulator

Producer adds the record to a buffer. This is the RecordAccumulator, and this buffer is loaded with: ConcurrentMap

> Batches, each partition corresponds to a Deque. When adding data, select a partition (if no key is created) and add the latest RecordBatch (if no RecordBatch is created) to the record. When sending, the RecordBatch in the deque header is ejected and sent (satisfying the first in, first out of queue).
,>

The source code to read

Process of adding key and value to the specified portition:

public RecordAppendResult append(TopicPartition tp,
                                    long timestamp,
                                    byte[] key,
                                    byte[] value,
                                    Header[] headers,
                                    Callback callback,
                                    long maxTimeToBlock) throws InterruptedException {...try {
        // 1. Find the Deque corresponding to this key from batches_map; If you can't find one, create one and add it to batches_map
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            ...
            // 2. Try adding a record to the last bathc in this deque; If it does, return result; No success, return NULL
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if(appendResult ! =null)
                returnappendResult; }...// create a new RecordBatch
        AbstractRecordSize = batchSize > abstractRecordSize
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); .// 4. Open up memory space according to size for record storage
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            ...
            // 5. Try again to see if you can join
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if(appendResult ! =null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }
            // 6. Create a RecordBatch based buffer
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            // add data to RecordBatch
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
            Add RecordBatch to the current queue
            dq.addLast(batch);
            // 9. Add this batch to the unack batch set
            incomplete.add(batch);

            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            // 10. If dp.size()>1, the queue has a batch that can be sent
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); }}finally {
        if(buffer ! =null) free.deallocate(buffer); . }}Copy the code

How will the data in the buffer be sent after being added to the specified partition? The main run loop for The sender thread:

public void run(a) {...// 1. The main loop runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e); }}.../ / 2. Not mandatory closed && (buffer and outstanding | | the request of the client and are being processing)
    // Complete the remaining unfinished tasks (easy, you quit, of course, complete the unfinished tasks)
    while(! forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e); }}// 3. Is forcibly closed, and Accumulator will abandon the batch that is not finished
    if (forceClose) {
        ...
        this.accumulator.abortIncompleteBatches();
    }
    try {
        // 4. Close the client
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e); }... }Copy the code

What we need to focus on here is runOnce(). This is where buffer messages are handled correctly:

void runOnce(a) {
	// The logic related to transaction messages is omitted here.long currentTimeMs = time.milliseconds();
    long pollTimeout = sendProducerData(currentTimeMs);   
    client.poll(pollTimeout, currentTimeMs);                    
}
Copy the code
  1. sendProducerDataTrue toRecordBatchSend -> merge a node’sRecordBatchIn aproduceRequest to send
  2. pollDo the realsocketRead and write

Poll () : NetworkClient. Poll () : NetworkClient. Responsible for actual network requests

accumulator.ready()

Before the client sends to the server, you need to determine the set of nodes in the cluster that are eligible for receiving. The conditions are as follows:

  1. DequeThere are multipleRecordBatchOr the first oneRecordBatchFull of
  2. Reach the need to sendRecordBatchTime interval of
  3. Waiting for thebufferPoolFree up space (no more space)
  4. Senderclose()The last space needs to be sent out

Let’s see how Ready prepares eligible nodes:

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    // 1. Node set that can receive messages
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    // 2. The partition for the leader copy cannot be found in metadata
    Set<String> unknownLeaderTopics = new HashSet<>();

    // 3. Check whether the bufferPool is waiting for space
    boolean exhausted = this.free.queued() > 0;
    // 4. Start cycling batches, identifying the node where the leader copy of each partition key recorded in the buffer is located
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<ProducerBatch> deque = entry.getValue();
        
        // 4-1. Find the node where the leader copy of the partition resides
        Node leader = cluster.leaderFor(part);
        // 4-2. Lock the current queue and read its message
        synchronized (deque) {
            // 4-3. If the leader in the current metadata cannot be found, the message cannot be sent
            if (leader == null && !deque.isEmpty()) {
                // 4-3-1. Joins the set of 'the leader copy could not be found', which is not empty and triggers metadata updates
                unknownLeaderTopics.add(part.topic());
            } else if(! readyNodes.contains(leader) && ! isMuted(part, nowMs)) {// select deque.first. Send the message as long as it is not empty
                ProducerBatch batch = deque.peekFirst();
                if(batch ! =null) {
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    boolean full = deque.size() > 1 || batch.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    // Correspond to the above four conditions
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if(sendable && ! backingOff) {// 4-3-4. Add the leader copy as node
                        readyNodes.add(leader);
                    } else {
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
Copy the code

Get readyNodes, then check to see if you can connect (yes if there is, create a connection if there is no connection. If there is still no connection, filter out the node.

At this point, the node to send is ready to send laterProducerBatchAnd I’m ready

accumulator.drain()

Now we need to switch to sending at the Node level. NodeId -> RecordBatch = TopicPartition -> ProducerBatch = NodeId -> RecordBatch = TopicPartition -> ProducerBatch = NodeId -> RecordBatch So there needs to be a transition from the logical layer to the network layer; The RecordBatch is also responsible for merging the RecordBatch requests into a single request.

This works on recordAccumulator.drain () :

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();
    
    NodeId -> RecordBatch
    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    // 2. Iterate over the currently ready Node
    for (Node node : nodes) {
        // 3. Insert the ProducerBatch corresponding to the partition set on node into ready
        // Focus the scope on each node
        List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
        // 4. nodeId -> RecordBatch
        batches.put(node.id(), ready);
    }
    return batches;
}
Copy the code









After filtering, all that remains is to send the ProducerBatch.
,>
,>
,>
,>
,>
,>
,>
,>

In simple terms: Finally, the ProducerBatch corresponding to the partition set under the same node is put into a Produce Request and sent out. This involves the client -> server protocol:

  1. How do I encapsulate a request?
  2. Request header? Where is the data

This will be covered in server side protocol analysis

Send core diagram

+--------------------+ | sendProducerData | +--------------------+ | +------------+--------------------+ | | v v +---------------------+ +---------------------+ | accumulator.ready() | | accumulator.drain() |-------+ +---------------------+ +---------------------+ | | ^ | v | v .-----------. | .-----------. ( send node )------avaliable  node------+ ( send batchs ) `-----------' `-----------' | +----------------------+ | | sendProduceRequests |<--------+ +----------------------+Copy the code