This is the ninth day of my participation in the Gwen Challenge.More article challenges
kafka producer accumulator
Producer adds the record to a buffer. This is the RecordAccumulator, and this buffer is loaded with: ConcurrentMap
> Batches, each partition corresponds to a Deque. When adding data, select a partition (if no key is created) and add the latest RecordBatch (if no RecordBatch is created) to the record. When sending, the RecordBatch in the deque header is ejected and sent (satisfying the first in, first out of queue).
The source code to read
Process of adding key and value to the specified portition:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {...try {
// 1. Find the Deque corresponding to this key from batches_map; If you can't find one, create one and add it to batches_map
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
...
// 2. Try adding a record to the last bathc in this deque; If it does, return result; No success, return NULL
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if(appendResult ! =null)
returnappendResult; }...// create a new RecordBatch
AbstractRecordSize = batchSize > abstractRecordSize
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); .// 4. Open up memory space according to size for record storage
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
...
// 5. Try again to see if you can join
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if(appendResult ! =null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// 6. Create a RecordBatch based buffer
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
// add data to RecordBatch
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
Add RecordBatch to the current queue
dq.addLast(batch);
// 9. Add this batch to the unack batch set
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
// 10. If dp.size()>1, the queue has a batch that can be sent
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); }}finally {
if(buffer ! =null) free.deallocate(buffer); . }}Copy the code
How will the data in the buffer be sent after being added to the specified partition? The main run loop for The sender thread:
public void run(a) {...// 1. The main loop runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e); }}.../ / 2. Not mandatory closed && (buffer and outstanding | | the request of the client and are being processing)
// Complete the remaining unfinished tasks (easy, you quit, of course, complete the unfinished tasks)
while(! forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e); }}// 3. Is forcibly closed, and Accumulator will abandon the batch that is not finished
if (forceClose) {
...
this.accumulator.abortIncompleteBatches();
}
try {
// 4. Close the client
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e); }... }Copy the code
What we need to focus on here is runOnce(). This is where buffer messages are handled correctly:
void runOnce(a) {
// The logic related to transaction messages is omitted here.long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
Copy the code
sendProducerData
True toRecordBatch
Send -> merge a node’sRecordBatch
In aproduce
Request to sendpoll
Do the realsocket
Read and write
Poll () : NetworkClient. Poll () : NetworkClient. Responsible for actual network requests
accumulator.ready()
Before the client sends to the server, you need to determine the set of nodes in the cluster that are eligible for receiving. The conditions are as follows:
Deque
There are multipleRecordBatch
Or the first oneRecordBatch
Full of- Reach the need to send
RecordBatch
Time interval of - Waiting for the
bufferPool
Free up space (no more space) Sender
被close()
The last space needs to be sent out
Let’s see how Ready prepares eligible nodes:
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
// 1. Node set that can receive messages
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
// 2. The partition for the leader copy cannot be found in metadata
Set<String> unknownLeaderTopics = new HashSet<>();
// 3. Check whether the bufferPool is waiting for space
boolean exhausted = this.free.queued() > 0;
// 4. Start cycling batches, identifying the node where the leader copy of each partition key recorded in the buffer is located
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
// 4-1. Find the node where the leader copy of the partition resides
Node leader = cluster.leaderFor(part);
// 4-2. Lock the current queue and read its message
synchronized (deque) {
// 4-3. If the leader in the current metadata cannot be found, the message cannot be sent
if (leader == null && !deque.isEmpty()) {
// 4-3-1. Joins the set of 'the leader copy could not be found', which is not empty and triggers metadata updates
unknownLeaderTopics.add(part.topic());
} else if(! readyNodes.contains(leader) && ! isMuted(part, nowMs)) {// select deque.first. Send the message as long as it is not empty
ProducerBatch batch = deque.peekFirst();
if(batch ! =null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
// Correspond to the above four conditions
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if(sendable && ! backingOff) {// 4-3-4. Add the leader copy as node
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
Copy the code
Get readyNodes, then check to see if you can connect (yes if there is, create a connection if there is no connection. If there is still no connection, filter out the node.
At this point, the node to send is ready to send laterProducerBatch
And I’m ready
accumulator.drain()
Now we need to switch to sending at the Node level. NodeId -> RecordBatch = TopicPartition -> ProducerBatch = NodeId -> RecordBatch = TopicPartition -> ProducerBatch = NodeId -> RecordBatch So there needs to be a transition from the logical layer to the network layer; The RecordBatch is also responsible for merging the RecordBatch requests into a single request.
This works on recordAccumulator.drain () :
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
NodeId -> RecordBatch
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
// 2. Iterate over the currently ready Node
for (Node node : nodes) {
// 3. Insert the ProducerBatch corresponding to the partition set on node into ready
// Focus the scope on each node
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
// 4. nodeId -> RecordBatch
batches.put(node.id(), ready);
}
return batches;
}
Copy the code
After filtering, all that remains is to send the ProducerBatch.
In simple terms: Finally, the ProducerBatch corresponding to the partition set under the same node is put into a Produce Request and sent out. This involves the client -> server protocol:
- How do I encapsulate a request?
- Request header? Where is the data
This will be covered in server side protocol analysis
Send core diagram
+--------------------+ | sendProducerData | +--------------------+ | +------------+--------------------+ | | v v +---------------------+ +---------------------+ | accumulator.ready() | | accumulator.drain() |-------+ +---------------------+ +---------------------+ | | ^ | v | v .-----------. | .-----------. ( send node )------avaliable node------+ ( send batchs ) `-----------' `-----------' | +----------------------+ | | sendProduceRequests |<--------+ +----------------------+Copy the code