If kafkaproducer.send () is used to send a message, the service thread will write it into RecordAccumulator for buffer. When the number of messages cached in RecordAccumulator reaches a certain threshold, the IO thread generates a batch request and sends it to the Kafka cluster. RecordAccumulator is an accumulator buffer.

RecordAccumulator is written by the business thread and read by the Sender thread. This is a very obvious producer-consumer pattern. Therefore, we need to ensure that RecordAccumulator is thread-safe. RecordAccumulator is an accumulator of ConcurrentMap

>. The Key is TopicPartition to represent the target partition, and the Value is the ArrayDeque queue to buffer messages sent to the target partition. The ArrayDeque here is not a thread-safe collection, as we’ll see later.
,>

A MemoryRecordsBuilder object is maintained in each ProducerBatch, and MemoryRecordsBuilder is where the actual message is stored. RecordAccumulator, ProducerBatch and MemoryRecordsBuilder are shown in the following figure.

The message format

Now that we’re going into KafkaProducer analysis, we need to understand the format of message inside Kafka, rather than simply knowing that message is KV. Kafka currently has three versions of the message format:

  • V0: prior to kafka0.10
  • V1: Kafka version 0.10 to 0.11
  • V2: kafka later than 0.11.0

Where V0 version

When V0 version of Message is used, Message is simply accumulated in RecordAccumulator without aggregation. Each message has independent meta information, as shown in the following figure:

The only cavate is the Attributes section, where the lower three bits are used to identify the compression algorithm currently in use and the higher five bits are not.

V1

The format of V1 version is basically similar to that of V0 version, except that there is a timestamp field. The specific structure is as follows:

The lower three bits of the Attributes section are still used to identify the compression algorithm currently in use, and the fourth bit is used to identify the timestamp type.

Timestamp is introduced in version V1 mainly for the interface of the following problems:

  1. More accurate log retention policies. In the V0 version, Kafka Broker determines whether to delete a segment file on disk based on the last modification time. However, the major drawback of this scheme is that if replica migration or replica expansion occurs, the segment files in the newly added replicas are all newly created, and the old message contained therein will not be deleted.
  2. More accurate log sharding strategy. Segment files are segmented regularly and quantitatively. In V0, the segment creation time is used to segment files, which causes the same problems as above, resulting in a single large file, or a small segment file with no message.

Compression in version V1

For common compression algorithms, the more compressed the content, the higher the proportion of compression effect. However, the length of a single message is generally not very long, and if we had to solve this contradiction, we would have to put multiple messages together and compress them. This is exactly what Kafka does. In V1 kafka uses wrapper messages to improve compression. The Wrapper message is a message, but its value is a collection of ordinary messages. These inner ordinary messages are also called inner messages. As shown below:

To further reduce invalid load on messages, Kafka only records full offsets in the Wrapper message, Offset in inner message is just an offset relative to the Wrapper message offset, as shown below:

When the Wrapper message is sent to the Kafka Broker, the broker does not need to uncompress it and stores it directly. When the consumer pulls the message, it passes it intact. The real unpacking is done by the consumer. This saves the broker resources to decompress and recompress.

Let’s talk about timestamps in V1

The timestamp type in the V1 message is identified by bit 4 in the Attributes column and has CreateTime and LogAppendTime types:

  • The CreateTime: TIMESTAMP field records the timestamp when the message was produced by producer
  • LogAppendTime: The timestamp field records the time when the broker wrote this message to the segment file.

When producer generates the message, the timestamp in the Message is CreateTime, and the Timestamp in the Wrapper message is the maximum value of all inner Message timestamp.

When a message is passed to the broker, Broker will be in accordance with its own log. Message. Timestamp. Type configuration (or topic of the message. The timestamp. Type configuration) (the default value is CreateTime) modified wrapper message timestamp. If the broker is using CreateTime, we can also set the Max. Message.. The time difference. Ms parameters, when the message of the timestamp and the difference between the local time, the broker is greater than the configuration values, The broker will reject writing this message.

If the broker or topic uses LogAppendTime, the broker local time is set directly to the Message timestamp field, And change the timestamp type bit in attributes to 1. If the message is compressed, only timestamp and timestamp type in the Wrapper message will be modified, not the inner message, to avoid decompression and recompression. That is, the broker only cares about the timestamp of the Wrapper message and ignores the timestamp of the inner message.

When a message is pulled to a consumer, the consumer will only process it based on the value of timestampe type. If the Wrapper Message is CreateTime, the consumer uses the timestamp of the inner message as CreateTime. If the Wrapper message is LogAppendTime, then the consumer uses the Wrapper message as LogAppendTime for all inner messages. The timestamp value of the inner message is ignored.

Finally, timestamp in message is an important basis for timestamp indexing, as we will discuss later when we introduce brokers.

V2 version

After Kafka version 0.11, the message format V2 is used, which is also compatible with V0 and V1 messages. However, some of the new features in Kafka cannot be used with older messages.

V2 version of the Message format references some features of the Protocol Buffer, introducing Varints (variable length integers) and ZigZag encoding, where Varints is a method of serializing integers using one or more bytes. The smaller the value, the fewer bytes it takes. Again, to reduce the size of messages. ZigZag coding is to solve the problem that Varints has low encoding efficiency for negative numbers. ZigZag will map signed integers to unsigned integers, thus improving the encoding efficiency of Varints for negative numbers with smaller absolute values, as shown in the figure below:

Now that we know the rationale for the format in V2, let’s look at the format of message in V2 (also known as Record) :

The important thing to note is that all the fields that identify the length are varint (or varlong), which is a variable length field, and timestamp and offset are delta values, which are offsets. In addition, all bits in the attribute field are deprecated and the header extension is added.

In addition to the basic Record format, V2 version also defines a Record Batch structure, students can compare V1 version format, Record is the inner structure, Record Batch is the outer structure, as shown in the figure below:

Record Batch contains a lot of fields, we look at one by one:

  1. BaseOffset: The start offset of the current RecordBatch. The offset delta in the Record is added to the baseOffset to get the real offset value. For RecordBatch, offset is a value assigned by the producer, not a partition.
  2. BatchLength: Indicates the total length of RecordBatch.
  3. PartitionLeaderEpoch: used to mark the epoch information of the leader Replica in the target partition. The related realization of this value will be seen again when detailed implementation is introduced later.
  4. Magic: indicates the magic value 2 in V2 version.
  5. CRC check code: The validation part is all the data from the attribute value to the end of the RecordBatch. The partitionLeaderEpoch is not in the CRC because every time the Broker receives the RecordBatch, Both assign partitionLeaderEpoch, which if included in a CRC will result in a CRC recalculation. This implementation will say later.
  6. Attributes: Expanded from 8 bits in V1 to 16 bits, with 0 to 2 bits indicating compression type, 3 bits indicating timestamp type, and 4 bits indicating whether or not a transactional record. “Transactions” is a new feature in Kafka. Once a transaction is enabled, transactional consumers can see records only after the transaction is committed. 5 indicates whether the transaction is a Control Record, which is always a single Record, contained in a Control Record batch, which can be used to mark “whether the transaction has been committed”, “whether the transaction has been aborted”, etc. It will only be processed within the broker. It is not transmitted to the consumer or producer and is transparent to the client.
  7. LastOffsetDelta: RecordBatch The relative shift of the last Record used by the broker to confirm the correctness of the assembly of Records in the RecordBatch.
  8. FirstTimestamp: RecordBatch Timestamp of the first Record.
  9. MaxTimestamp: The maximum timestamp in RecordBatch, typically the timestamp of the last message, used by the broker to confirm the correctness of the assembly of Records in RecordBatch.
  10. Producer ID: producer ID used to support Exactly Once semantics, see KIP-98-Exactly Once Delivery and Transactional Messaging.
  11. Producer epoch: Used to support idempotence (Exactly Once semantics)
  12. Base sequence: Base sequence number, used to support idempotence (Exactly Once semantics), used to verify whether a Record is repeated.
  13. Records Count: Indicates the number of records.

By analyzing the message format of version V2, we know that Kafka Message not only provides new features like transactions, powers, and so on, but also provides sufficient optimization for space usage, which is a big improvement overall

MemoryRecordsBuilder

Now that we’ve seen the evolution of the Kafka Message format, let’s go back to KafkaProducer’s code.

Each MemoryRecordsBuilder relies on a ByteBuffer to store messages. KafkaProducer’s management of ByteBuffer will be described in more detail later. The MemoryRecordsBuilder encapsulates ByteBuffer as a ByteBufferOutputStream, which implements OutputStream, So we can write data as a stream. Meanwhile, ByteBufferOutputStream provides the ability to automatically expand the underlying ByteBuffer.

There is also the compressionType field, which specifies which compression algorithm MemoryRecordsBuilder currently uses to compress ByteBuffer. Kafka currently supports the following compression algorithms: GZIP, SNAPPY, LZ4, and ZSTD. Note: Only Kafka V2 supports the ZSTD compression algorithm.

public MemoryRecordsBuilder(ByteBuffer buffer,...) { // Omit other arguments
    // Encapsulate the MemoryRecordsBuilder associated ByteBuffer as a ByteBufferOutputStream
    this(newByteBufferOutputStream(buffer), ...) ; }public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,...) { // Omit other arguments
    // omit initialization of other fields
    this.bufferStream = bufferStream;
    // Wrap a compressed stream around the bufferStream, and then wrap a DataOutputStream around it
    this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
Copy the code

So we get appendStream as shown in the following figure:

Now that we know the underlying storage of MemoryRecordsBuilder, let’s look at the core methods of MemoryRecordsBuilder. AppendWithOffset () : ProducerBatch RecordBatch = Record (V2);

public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
	return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}

private long nextSequentialOffset(a) {
  // Where baseOffset is baseOffset in RecordBatch, lastOffset is used to Record the offset value currently written to the Record, followed by lastoffset-baseoffset to calculate offset delta. LastOffset is incremented each time a new Record is written
  return lastOffset == null ? baseOffset : lastOffset + 1;
}

private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
    if(isControlRecord ! =this.isControlBatch) // Check whether the isControl flag is consistent
        throw new IllegalArgumentException("...");

    if(lastOffset ! =null && offset <= this.lastOffset) // Ensure that offset is incremented
        throw new IllegalArgumentException("...");

    if (timestamp < 0&& timestamp ! = RecordBatch.NO_TIMESTAMP)// Check the timestamp
        throw new IllegalArgumentException("...");
  
    // Check: Only V2 messages can have headers
    if(magic < RecordBatch.MAGIC_VALUE_V2 && headers ! =null && headers.length > 0)
        throw new IllegalArgumentException("...");

    if (this.firstTimestamp == null) / / update the firstTimestamp
        this.firstTimestamp = timestamp;

    if (magic > RecordBatch.MAGIC_VALUE_V1) { // Write to V2
        appendDefaultRecord(offset, timestamp, key, value, headers);
        return null;
    } else { // Write to V0, V1
        returnappendLegacyRecord(offset, timestamp, key, value, magic); }}Copy the code

The appendDefaultRecord() method calculates the offsetDelta and timestampDelta in the Record, writes the Record, and updates the RecordBatch metadata as follows:

private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                    Header[] headers) throws IOException {
    ensureOpenForRecordAppend(); // Check the appendStream status
    / / offsetDelta calculation
    int offsetDelta = (int) (offset - this.baseOffset);
    / / timestampDelta calculation
    long timestampDelta = timestamp - this.firstTimestamp; 
    DefaultRecord used here is a utility class whose writeTo() method is formatted as V2 Record into the appendStream stream
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    // Modify the meta information in RecordBatch, for example, numRecords (numRecords)
    recordWritten(offset, timestamp, sizeInBytes);
}
Copy the code

Another method of interest in MemoryRecordsBuilder is the hasRoomFor() method, which is used to estimate whether the MemoryRecordsBuilder currently hasRoomFor the Record to be written.

public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
    AppendStream and whether the number of bytes currently written exceeds the writeLimit specified by the writeLimit field. The writeLimit field here records the upper limit of the number of bytes that the MemoryRecordsBuilder can write
    if (isFull()) 
        return false;

    // Each RecordBatch can write at least one Record, if there is no Record, then continue to write
    if (numRecords == 0)
        return true;

    final int recordSize;
    if (magic < RecordBatch.MAGIC_VALUE_V2) { // V0, V1
        recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
    } else { 
        // Estimate the size of the Record written this time
        int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
        long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp;
        recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
    }
    // Number of bytes written + The number of bytes written to Record cannot exceed writeLimit
    return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
Copy the code

ProducerBatch

Let’s move up one level to ProducerBatch implementation, where the core method is tryAppend(). The core steps are as follows:

  1. The MemoryRecordsBuilder hasRoomFor() method checks whether the current ProducerBatch has enough space to store the Record to be written.
  2. Call MemoryRecordsBuilder. Append () method will Record the ByteBuffer appended to the bottom.
  3. Create the FutureRecordMetadata object, which inherits the Future interface and corresponds to the sending of the Record.
  4. Encapsulate the FutureRecordMetadata object and the Callback Callback associated with the Record as a Thunk object, logging it to thunks (of type List).

Here is an implementation of the producerBatch.tryAppend () method:

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    // Check whether the MemoryRecordsBuilder has room to continue writing
    if(! recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {return null; // Return null if there is no space to write
    } else {
        // Call the append() method to write Record
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        // Update maxRecordSize and lastAppendTime
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        // Create the FutureRecordMetadata object
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture,
                                      this.recordCount,timestamp, checksum, 
                                      key == null ? -1 : key.length, 
                                      value == null ? -1 : value.length, Time.SYSTEM);
        // write Callback and FutureRecordMetadata to thunks
        thunks.add(new Thunk(callback, future));
        this.recordCount++; // Update the recordCount field
        return future; / / return FutureRecordMetadata}}Copy the code

In addition to MemoryRecordsBuilder, a lot of other key information is recorded in ProducerBatch:

The ProduceRequestResult class maintains a CountDownLatch object (count = 1) that provides futurelike functions. When the request is formed by ProducerBatch broker side response (normal response, timeout, abnormal response) or KafkaProducer closed, will be called ProduceRequestResult. Done () method, The method calls the countDown() method of the CountDownLatch object to wake up the thread blocking the await() method of the CountDownLatch object. The thread can check whether the request was successful or failed by checking the error field of the ProduceRequestResult.

There is also a baseOffset field in the ProducerBatch association equestResult that records the offset value assigned by the broker to the first Record in the ProducerBatch association. Each Record real offset can be according to their own place in the ProducerBatch calculated (Record real offset = ProduceRequestResult baseOffset + RelativeOffset).

Next look at FutureRecordMetadata, which implements the Future interface in the JDK and represents the state of a Record. FutureRecordMetadata maintains a relativeOffset field in addition to the ProduceRequestResult object associated with it. RelativeOffset records the offset of the corresponding Record in ProducerBatch.

In FutureRecordMetadata, there are two notable methods. One is the get() method, which relies on CountDown in ProduceRequestResult to block the wait, And call the value() method to return the RecordMetadata object:

public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    // Calculate the timeout
    long now = time.milliseconds();
    long timeoutMillis = unit.toMillis(timeout);
    long deadline = Long.MAX_VALUE - timeoutMillis < now ? Long.MAX_VALUE : now + timeoutMillis;
    // Rely on CountDown in ProduceRequestResult for block waiting
    boolean occurred = this.result.await(timeout, unit);
    if(! occurred)throw new TimeoutException("Timeout after waiting for " + timeoutMillis + " ms.");
    if(nextRecordMetadata ! =null) NextRecordMetadata can be ignored and split can be introduced later
        return nextRecordMetadata.get(deadline - time.milliseconds(), TimeUnit.MILLISECONDS);
    // Call value() to return the RecordMetadata object
    return valueOrError();
}
Copy the code

The other is the value() method, which wraps partition information, baseOffset, relativeOffset, timestamp (LogAppendTime or CreateTime) into a RecordMetadata object and returns:

RecordMetadata value(a) {
    if(nextRecordMetadata ! =null) // Ignore nextRecordMetadata for now
        return nextRecordMetadata.value();
    // Encapsulate partition information, baseOffset, relativeOffset, and timestamp (LogAppendTime or CreateTime) as RecordMetadata objects and return them
    return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), 
                              this.relativeOffset, timestamp(), this.checksum, 
                              this.serializedKeySize, this.serializedValueSize);
}
Copy the code

Finally, look at the ProducerBatch thunks collection, where each Thunk object corresponds to a Record object, The corresponding Callback object associated with the Record and the associated FutureRecordMetadata object are recorded in the Thunk object.

Now that we know about ProducerBatch writing data, we return to ProducerBatch to focus on its done() method. The done() method of ProducerBatch is called when KafkaProducer receives a normal response from ProducerBatch, times out, or shuts down the producer. In the done () method, ProducerBatch will first update finalState state, and then call completeFutureAndFireCallbacks () method to trigger the Record of the Callback, the Callback implementation is as follows:

public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
    // Determine the final status of the ProducerBatch send according to the exception field
    final FinalState tryFinalState = (exception == null)? FinalState.SUCCEEDED : FinalState.FAILED;/ / update the CAS operation finalState state, only update for the first time, will trigger completeFutureAndFireCallbacks () method
    if (this.finalState.compareAndSet(null, tryFinalState)) {
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
    // The done() method may be called once or twice, and if the SUCCEED state switches to another failed state, an exception will be thrown
    if (this.finalState.get() ! = FinalState.SUCCEEDED) {// Omit the log output code
    } else {
        throw new IllegalStateException("...");
    }
    return false;
}
Copy the code

In completeFutureAndFireCallbacks () method, the collection will traverse thunks trigger each Record Callback, Update baseOffset, logAppendTime and error fields in ProduceRequestResult and call done() to release blocked threads as follows:

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
    Update baseOffset, logAppendTime and error fields in ProduceRequestResult
    produceFuture.set(baseOffset, logAppendTime, exception);
    // Iterate through the thunks collection, triggering a Callback for each Record
    for (Thunk thunk : thunks) { // Omit the try/catch block
        if (exception == null) {
            RecordMetadata metadata = thunk.future.value();
            if(thunk.callback ! =null)
                thunk.callback.onCompletion(metadata, null);
        } else {
            if(thunk.callback ! =null)
                thunk.callback.onCompletion(null, exception); }}// Call the underlying countdownlatch.countdown () method to release the threads blocking on it
    produceFuture.done();
}
Copy the code

BufferPool

As mentioned earlier, MemoryRecordsBuilder underlying uses ByteBuffer to store written Record data, but creating ByteBuffer objects is itself a resource-consuming activity. So KafkaProducer uses a BufferPool to implement unified management of Bytebuffers. A BufferPool is essentially a pool of Bytebuffers that we take when we need them and then return them to the BufferPool when we’re done using them.

BufferPool is a relatively simple resource pool implementation. It only manages Bytebuffers of a specific size (poolableSize) and ignores other sizes. Netty source code will be introduced in detail later).

In general, we adjust the ProducerBatch size (batch.size configuration (specifying the number of records) * the estimated size of a single Record) so that each ProducerBatch can cache multiple records. However, when the number of bytes in a Record is larger than the entire ProducerBatch, it does not try to apply for ByteBuffer from BufferPool. Instead, it directly allocates new ByteBuffer objects, which are discarded and recycled by GC after being used up.

Let’s look at the BufferPool core fields:

The core logic for allocating ByteBuffer by BufferPool is in the allocate() method. The logic is not complicated, and goes directly to the code and comments:

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    if (size > this.totalMemory) // First check whether the target ByteBuffer size is greater than
        throw new IllegalArgumentException("...");

    ByteBuffer buffer = null;
    this.lock.lock(); / / lock
    
    // Check the status of the current BufferPool. If the current BufferPool is closed, throw an exception.

    try {
        // If the target size is poolableSize and the free list is empty, reuse ByteBuffer from the free list directly
        if (size == poolableSize && !this.free.isEmpty())
            return this.free.pollFirst();

        // Calculate the total ByteBuffer space in the free list
        int freeListSize = freeSize() * this.poolableSize;
        // The BufferPool currently frees the target size of space, which is allocated directly through the freeUp() method
        if (this.nonPooledAvailableMemory + freeListSize >= size) {
            freeUp(size);
            this.nonPooledAvailableMemory -= size;
        } else {
            int accumulated = 0;
            // If the current BufferPool space is insufficient to provide the target space, the current thread needs to be blocked
            Condition moreMemory = this.lock.newCondition();
            try {
                // Calculate the maximum blocking time of the current thread
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory);
                while (accumulated < size) { // Loop to wait for allocation to succeed
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        // The current thread is blocking and waiting. If false is returned, the blocking timeout occurswaitingTimeElapsed = ! moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); }finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                    }
                    // Check the status of the current BufferPool. If the current BufferPool is closed, throw an exception.
                    if (waitingTimeElapsed) { 
                        // If the specified amount of space is not obtained within the specified time, an exception will be thrown
                        throw new BufferExhaustedException("...");
                    }

                    remainingTimeToBlockNs -= timeNs;
                    // The target size is a poolableSize size ByteBuffer, and an idle ByteBuffer is present in free
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        // Allocate some space and wait for free space
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                        this.nonPooledAvailableMemory -= got;
                        accumulated += got;
                    }
                }
                accumulated = 0;
            } finally {
                // Accumulated is not 0 and is returned if the previous while loop does not end normally
                this.nonPooledAvailableMemory += accumulated;
                this.waiters.remove(moreMemory); }}}finally {
        // If there is free space in the current BufferPool, the next waiting thread will be woken up to try to allocate ByteBuffer
        try {
            if(! (this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                this.waiters.peekFirst().signal();
        } finally {
            lock.unlock(); / / unlock}}if (buffer == null) 
        // It was allocated successfully but could not reuse the ByteBuffer in the free list (either the target size is not poolableSize, or the free list itself is empty)
        return safeAllocateByteBuffer(size);
    else
        return buffer; // Reuse free size ByteBuffer directly
}

// Take a quick look at the freeUp() method, which continually frees free byteBuffers from the free list to replenish nonPooledAvailableMemory
private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
Copy the code

A ByteBuffer allocated from a BufferPool is released by calling dealLocate ().

public void deallocate(ByteBuffer buffer, int size) {
    lock.lock(); / / lock
    try {
        // If the size of the ByteBuffer to be freed is poolableSize, it is placed directly in the free list
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            // If the poolableSize size is not present, it is up to the JVM GC to reclaim ByteBuffer and increase nonPooledAvailableMemory
            this.nonPooledAvailableMemory += size;
        }
        // Wake up the first blocking thread in your work
        Condition moreMem = this.waiters.peekFirst();
        if(moreMem ! =null)
            moreMem.signal();
    } finally {
        lock.unlock(); / / releases the lock}}Copy the code

RecordAccumulator

After analyzing MemoryRecordsBuilder, ProducerBatch, and BufferPool methods related to write, we’ll look at RecordAccumulator implementation.

When analyzing a class, you should look at its data structure first and then at its behavior (methods). RecordAccumulator key fields are as follows:

In the previous analysis KafkaProducer. DoSend () method to send the message, directly call the RecordsAccumulator. Append () method, This is also where the producerBatch.tryAppend () method is called to append the message to the underlying MemoryRecordsBuilder. Recordaccumulator.append ()

  • Locate the ArrayDeque set corresponding to the target partition in the Batches set. If the search fails, a new ArrayDeque is created and added to the Batches set.
  • Lock the ArrayDeque collection obtained in Step 1. Synchronized code blocks are used here for locking.
  • Execute the tryAppend() method to try to write Record to the last ProducerBatch in ArrayDeque.
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque, long nowMs) {
    // Get the last ProducerBatch object in the ArrayDeque
      
        collection
      
    ProducerBatch last = deque.peekLast();
    if(last ! =null) {
        // Try to write the message to the ProducerBatch object last
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
        if (future == null) 
            // Write fails, the ProducerBatch object pointed to by last is closed, and null is returned indicating that write fails
            last.closeForRecordAppends();
        else
            // Write successfully, return RecordAppendResult object
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false.false);
    }
    return null;
}
Copy the code
  • When the synchronized block is complete, the lock is automatically released.
  • If the append in Step 3 succeeds, RecordAppendResult is returned.
  • If the append Record in Step 3 fails, it may be because the ProducerBatch currently in use has been filled. The abortOnNewBatch parameter is determined to be true. If so, the RecordAppendResult result (where the abortForNewBatch field is set to true) is immediately returned. If abortForNewBatch is true in the RecordAppendResult returned, the RecordAccumulator.append() method will be triggered again.
  • If the abortForNewBatch parameter is not true, a new ByteBuffer is allocated from the BufferPool and encapsulated as a new ProducerBatch object.
  • Lock ArrayDeque again and try to append Record to the newly created ProducerBatch and the newly created ProducerBatch to the tail of the corresponding Deque.
  • Add the newly created ProducerBatch to the incomplete collection. The synchronized block is automatically unlocked.
  • RecordAppendResult returns the batchIsFull and newBatchCreated fields in RecordAppendResult as conditions for waking up the Sender thread. The kafkaproducer.dosend () method wakes up the Sender thread in the following code snippet:
if (result.batchIsFull || result.newBatchCreated) {
    // When this write fills a ProducerBatch or a new ProducerBatch is created, the Sender thread is woken up to send
    this.sender.wakeup();
}
Copy the code

Recordaccumulator.append ()

public RecordAppendResult append(TopicPartition tp, long timestamp,
        byte[] key, byte[] value, Header[] headers, Callback callback,
        long maxTimeToBlock, boolean abortOnNewBatch, long nowMs) throws InterruptedException {
    // Count the number of records being written to RecordAccumulator
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // Find the ArrayDeque
      
        set corresponding to the target partition in the Batches set.
      
        // If the look-up fails, a new ArrayDeque
      
        is created and added to the Batches collection.
      
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) { // lock ArrayDeque
      
            // If the appended operation succeeds, RecordAppendResult is returned
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if(appendResult ! =null) // If appended successfully, the appendResult returned is not null
                return appendResult;
        }

        // If the append Record fails, it is possible that the ProducerBatch used has already been filled.
        // Decide whether to return RecordAppendResult immediately. If abortForNewBatch is true,
        // The append() method fires again
        if (abortOnNewBatch) {
            return new RecordAppendResult(null.false.false.true);
        }

        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        // Allocate ByteBuffer from BufferPool
        buffer = free.allocate(size, maxTimeToBlock);

        nowMs = time.milliseconds();
        synchronized (dq) { // lock ArrayDeque
      
        again
      
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            // Try the tryAppend() method again to append the Record
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
            if(appendResult ! =null) {
                return appendResult;
            }
            // Encapsulate ByteBuffer as a MemoryRecordsBuilder
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            // Create the ProducerBatch object
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
            // Append Record to ProducerBatch via tryAppend()
            FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                    callback, nowMs));
            // Add ProducerBatch to ArrayDeque
      
            dq.addLast(batch);
            // Add ProducerBatch to IncompleteBatches
            incomplete.add(batch);

            buffer = null; / / empty buffer
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true.false); }}finally {
        if(buffer ! =null) // If the buffer is not empty, an exception occurred during the write process, and ByteBuffer is released
            free.deallocate(buffer);
        // The current Record has been written, and decrement appendsInProgressappendsInProgress.decrementAndGet(); }}Copy the code

ArrayDeque is a non-thread-safe collection, but why lock it twice? This is mainly due to the possibility of blocking when requesting a new ByteBuffer from the BufferPool. Let’s assume that all the above appended operations are completed in A synchronized block. Thread A sends A large Record and needs to apply for A new space from the BufferPool. At this time, when the BufferPool space is insufficient, thread A will block and wait on the BufferPool. It still holds the lock corresponding to ArrayDeque; The Record sent by thread B is small, and the remaining space of the last ProducerBatch of ArrayDeque at this time is just enough to write to the Record. However, thread A has not released the lock of Deque, so it also needs to wait together, which causes unnecessary block of thread B. Reduced throughput. The essence of this is optimization by reducing lock holding time.

In addition to the two ArrayDeque lock operations, we also see retry after the second lock, mainly to prevent internal fragmentation after multiple threads concurrently request space from the BufferPool. In this scenario, thread A finds that the last ProducerBatch space is insufficient, applies for the space and creates A new ProducerBatch object to add to the tail of the ArrayDeque, and thread B and thread A execute concurrently. Also add a newly created ProducerBatch to the ArrayDeque tail. As we can see from the logic of the tryAppend() method above, subsequent writes will only be made to ProducerBatch at the tail of ArrayDeque, which will result in ProducerBatch3 not being written again, resulting in internal fragmentation:

RecordAccumulator is called before the Sender thread sends a Record to kafka Broker. The method will obtain the set of nodes that can receive the Record to be sent according to the cluster metadata, and the specific screening conditions are as follows:

  1. The ArrayDeque in the Batchs collection has multiple recordBatches or whether the first RecordBatch is full.
  2. Whether the waiting time is long enough. This is mainly in two aspects, if there is a retry, need to exceed the retryBackoffMs retreat time; If there is no retry, the wait time specified in linger. Ms configuration must be exceeded (linger. Ms default is 0).
  3. Is there another thread waiting for BufferPool to free space?
  4. Whether a thread has called flush() and is waiting for the flush operation to complete.

Here is the code for the Ready method, which iterates through each partition of a Batches set. First, it looks for the Node where the leader replica of the target partition is located. KafkaProducer doesn’t know where to send a batch until it knows the Node information. Each ArrayDeque is then processed, and if the four criteria are met, the corresponding Node information is recorded to the readyNodes collection. Finally, the ready() method returns the ReadyCheckResult object, which records the set of nodes that meet the sending conditions, the topics that cannot be found by the leader Replica during the traversal, and the time interval for the next check by calling the Ready () method.

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    // It is used to record which nodes can send data to
    Set<Node> readyNodes = new HashSet<>();  
    // Records the next time the ready() method needs to be called
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    // The leader Replica topic cannot be found in the Cluster metadata
    Set<String> unknownLeaderTopics = new HashSet<>();
    // Whether a thread is blocking waiting for BufferPool to free space
    boolean exhausted = this.free.queued() > 0;

    // The following describes a batch set, and determine the Node where the leader replica of each partition is located
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
        Deque<ProducerBatch> deque = entry.getValue();
        synchronized (deque) {
            ProducerBatch batch = deque.peekFirst();
            if(batch ! =null) {
                TopicPartition part = entry.getKey();
                // Locate the leader replica node of the target partition
                Node leader = cluster.leaderFor(part);
                if (leader == null) {
                    // If the leader Replica cannot find the replica, it considers that the replica is abnormal and cannot send messages
                    unknownLeaderTopics.add(part.topic());
                } else if(! readyNodes.contains(leader) && ! isMuted(part)) {boolean full = deque.size() > 1 || batch.isFull();
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    // Check the above five conditions to find the Node involved in the send
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if(sendable && ! backingOff) { readyNodes.add(leader); }else {
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Records the next time the ready() method needs to be called
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
Copy the code

After calling the recordAccumulator.ready () method to get the readyNodes collection, which is filtered by NetworkClient (more on that later in the Sender thread), To get the set of nodes that can finally send messages.

After that, the Sender thread calls the recordAccumulator.drain () method to get the ProducerBatch to send based on the above Node set, returning the Map<Integer, List> set, Key is the Id of the target Node, and Value is the ProducerBatch set to be sent this time. In the upper-level business logic that calls KafkaProducer, data is generated as topicPartitions, and it only cares which topicpartitions are sent to, not which Node these topicpartitions are on. At the network IO level, the producer sends message data to Node nodes. It only establishes the connection to Node and sends data, and does not care which TopicPartition the data belongs to. The core function of the drain() method is to convert mappings from TopicPartition -> ProducerBatch to Node -> ProducerBatch. Here is the core code for the drain() method:

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();
    Key is the Id of the target Node and Value is the ProducerBatch set sent to the target Node
    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        // Get the ProducerBatch collection of the target Node
        List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
        batches.put(node.id(), ready);
    }
    return batches;
}

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
    int size = 0;
    // Get the partition set on the current Node
    List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
    // Record the ProducerBatch collection sent to the destination Node
    List<ProducerBatch> ready = new ArrayList<>();
    // drainIndex is an index of batches. It records the location of the last batch when sending stopped and the next batch starts from this location. If you always start from
    // The queue with index 0 starts to send messages, which may keep sending messages to only the first few partitions, causing other partitions to starve.
    int start = drainIndex = drainIndex % parts.size();
    do {
        // Obtain the metadata of the partition
        PartitionInfo part = parts.get(drainIndex);
        TopicPartition tp = new TopicPartition(part.topic(), part.partition());
        this.drainIndex = (this.drainIndex + 1) % parts.size();
        // Check whether the ArrayDeque corresponding to the target partition is empty.
        synchronized (deque) {
            // Get the first ProducerBatch object in ArrayDeque
            ProducerBatch first = deque.peekFirst();
            if (first == null)
                continue;
            // To retry the operation, check to see if you have waited for sufficient backoff time (omitted)
            if(size + first.estimatedSizeInBytes() > maxSize && ! ready.isEmpty()) {// The amount of data to be sent is full
                break;
            } else {
                if (shouldStopDrainBatchesForPartition(first, tp))
                    break;
                // Get the first ProducerBatch in ArrayDeque
                ProducerBatch batch = deque.pollFirst();
                // Transaction related processing (omitted)
                // Close the underlying output stream and set ProducerBatch to read-only
                batch.close();
                size += batch.records().sizeInBytes();
                // Record ProducerBatch to the ready collection
                ready.add(batch);
                // Change the drainedMs tag for ProducerBatchbatch.drained(now); }}}while(start ! = drainIndex);return ready;
}
Copy the code

conclusion

This class first introduces the evolution of message format in Kafka, and analyzes the format changes of V0, V1 and V2 in detail.

KafkaProducer RecordAccumulator is a data relay between the business thread and the Sender thread. It mainly involves MemoryRecordsBuilder, ProducerBatch, BufferPool and other underlying components as well as the core method of RecordAccumulator.

In the next class, we’ll start talking about the Sender thread in KafkaProducer.

Articles and videos from this class will also be posted on:

  • Wechat official account: Yang Sizheng
  • The original address: xxxlxy2008. Making. IO/kafka/E3%4%…
  • B station search: Yang Sizheng