background

I encountered some online problems and found that I was not quite familiar with the configuration details of Kakfa. Through this article, I summarized some core objectives:

  • Take a look at the main processes in Kafka-Client
  • What are kafka-client configurations and how do they affect Kafka Client sending
  • Kafka-client metrics: Kafka-client metrics: Kafka-Client metrics: Kafka-Client metrics

The main process

As a Producer, the core is to comb two things: Sender and RecordAccumulator

  • Sender: Is the primary service for kafka send process. It is responsible for receiving data and placing it into RecordAccumulator, or retrieving data from RecordAccumulator and sending it to Kafka server, or updating some meta services.
  • RecordAccumulator: Kafka is asynchronous throughout the send process. The main purpose is to batch some data to increase the throughput. RecordAccumulator is mainly responsible for data cache management

The process that is the core of the Sender’s single-loop body is as big as the figure above, and we can disassemble each step from the top down. The above process is in Sender#sendProducerData

How to determine and obtain kafka nodes that can be sent

For RecordAccumulator, data is cached as Map<TopicPartition, Deque> :

TopicPartition obviously refers to topic-partion

ProducerBatch is a Record request that needs to be sent in the same batch. ProducerBatch itself is not thread-safe and will be locked according to the granularity of the Deque in actual operation. Within ProducerBatch, the actual recrods are maintained in the form of MemoryRecordsBuilder. ProducerBatch also contains a lot of other data, such as data callbacks for some requests, etc. If we can continue to talk about this later, At this stage, we should first return to the analysis of the main process

final long createdMs;
final TopicPartition topicPartition;
final ProduceRequestResult produceFuture;

private final List<Thunk> thunks = new ArrayList<>();
private final MemoryRecordsBuilder recordsBuilder;
private final AtomicInteger attempts = new AtomicInteger(0);
private final boolean isSplitBatch;
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);

int recordCount;
int maxRecordSize;
private long lastAttemptMs;
private long lastAppendTime;
private long drainedMs;
private boolean retry;
private boolean reopened;
Copy the code

Determine which data is ready in the ready part of kafka’s RecordAccumulator core code:

First, the server must meet certain conditions:

  • The leader of the partion that needs to be sent is known, and the server query metadata accessing Kafka is required if the leader is included, but this part blocks the overall process and is therefore effectively asynchronous
  • The partion currently awaiting delivery was not fraternal, that is, not set to a blocked state.
  • The current partion is not in the backoff state. This indicates that the current partion is triggering retry.

Secondly, the current batch of partion needs to meet certain conditions

  • Current Batch wait time from last time sent > Delay allowed to wait (use lingerMs for first attempt, retryBackoffMs for retry logic)
  • Whether the current two-end queue is full of batch, for example, the original number in the queue is greater than 1, or there is only one element whose size meets the sending condition
  • Currently, Producer is in the close state
  • Total memory is full: we already know that Producer’s data needs to be cached for a while. Producer has a BufferPool that controls the memory. If the BufferPool is not empty, the BufferPool will queue up the data
  • There is a thread that is flushing: this is a little harder to understand, I will add when I am more certain.
  • Transaction completion, (later version of the transaction model supported by KakFA, not to be covered here)

How do I obtain Batch data to be sent

The main logic is summarized as follows:

RecordAccumulator ConcurrentMap<TopicPartition, Deque>, try to obtain batch list of each TopicPartition not larger than maxRequestSize. Close the batches and put them on the to send list.

But there is some logic to be aware of in the implementation.

We all know the basic concepts of Kafka broker and Kafka topicpartion. Different partion may be assigned to the same broker. In kafka’s implementation, each drain process only calls one partion from the current node to send a message.

To avoid starving large serialized partion by starting at 0 each drain, the client uses a virtual drainIndex that increments each drain. The actual drainIndex starts at start.

int start = drainIndex = drainIndex % parts.size();
Copy the code

I don’t understand why drainInde is used globally. If IT were me, it would be used in nodeId. I’m not sure what point is being considered here. If it’s a global drainIndex, it actually exists. What if a single Node has too much partion than all the other nodes and causes starvation?

Another interesting problem is that when there are some extreme cases, for example, there is only one message in a single Batch, but the size of this message is larger than the limit of request size, the message will be sent as a single Batch. In order to achieve this, Kafka’s client checks whether the sum of the pending + NextBatch sizes is greater than the request size only when the waiting list is not empty

The above generation is in: RecordAccumulator#drainBatchesForOneNode

How do I detect expired data

RecordAccumulator is the buffer data in ConcurrentMap<TopicPartition, Deque>. Part of them are numbers of data in the Sender that are waiting to send. Map<TopicPartition, List> inFlightBatches

The time compared is deliveryTimeoutMs, and the difference between the current time and creation time.

The failed callBack is called for invalid data.

Data sending and packaging

The data that completes the above data filtering is packaged into ProduceRequest and sent to the client for sending.

Configurations and restrictions

With the above conditions sorted out, let’s take a look at the configuration that controls some of the above processes:

  • Batch. size: Refers to the ProducerBatch size of each two-end queue
  • Buffer. memory: This is the size of RecordAccumulator’s buffer
  • Max-request. size: this refers to the size that is sent to each Node during drain. If a single messge is larger than this value, detection will be skipped, but packaging will be affected.
  • Linger. ms: The time that data resides in the buffer between ProducerBatch creation and drain to be sent in a non-retry scenario
  • Retry. Backoff. Ms: This configuration is actually designed to avoid some extreme scenarios, such as the retry scenario, which may be due to server problems. If we do not increase the client’s memory duration, It is possible to run out of retries in a very short time.
  • Delivery.timeout. ms: This configuration refers to the total time it takes to process the sending process from the add to Kafka client to the client, including the time that resides in the buffer and the time in the waiting list.
  • Request.timeout. ms: This part of the time actually refers to the time between the client sends the Request and receives the response.

Such as online I have encountered problems: e.kafka.com mon. Errors. TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

The code shows that the buffer is stuck in the memory request stage, which is actually the size of the buffer is insufficient. After comparing the configuration of Producer, it is found that the large setting of batch.size leads to the number of downstream topicpartion far exceeds buffer.memory, that is, the buffer can only hold part of partion data at most. Thus, the whole production process of Producer is blocked.

other

Some interesting points were also observed during the observation, which can be studied later

  1. The client provides metric points to measure metrics on the client side, which can be exposed by configuring metr.Reporters and implementing the MetricsReporter interface
  1. ProducerBatch actually has some other interesting points. For example, ProducerBatch itself contains certain state information, such as how to ensure that when a data is added to the client, the data flow will not be affected in the whole SEND process. There are some thread safety issues that need to be solved. The other is the conversion between the memory data and the physical data of the client, as well as the implementation of compression and the relationship between the time nodes.