preface
Key words brief introduction
- Broker: Represents a physical node of a Kafka service. This version registers its ID with ZooKeeper to achieve high availability. There is only one Broker in a standalone environment
- Topic: Message Topic, which consumers subscribe to to get messages
- Partition: Message Partition. A topic can have multiple partitions. Messages are sent to the Partition in a certain format
The source code
Sending messages using producers involves the following classes:
- KafkaProducer: Producer master class, used to coordinate other resources
- RecordAccumulator: A message accumulator. When sending a message, the message is submitted to the accumulator. The accumulator stores the message according to Topic and Partition
- Sender: Gets the message from RecordAccumulator and constructs the request to submit and send
- NetworkClient: mainly calls KafkaChannel to send and read messages, and processes successful messages sent and read. It is the core of the event cycle
RecordAccumulator Message accumulator
Why is RecordAccumulator an accumulator? Because RecordAccumulator is used to store a batch of messages, which involves some processing methods of messages. Therefore, it is not associated with other classes, so it can analyze the source code separately
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
Copy the code
The main structure, which uses a Deque to store ProducerBatch
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch) throws InterruptedException {
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// Get or create a deque to store records
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if(appendResult ! =null)
return appendResult;
}
// Returns if no batch records are set to true
if (abortOnNewBatch) {
return new RecordAppendResult(null.false.false.true);
}
Copy the code
In the first half of the append method, the Deque is first obtained based on the subject and partition information. If it does not exist, the Deque is created, and tryAppend is called to append the Deque.
Note the abortOnNewBatch parameter, which indicates that if tryAppend fails and is set to true, the current append operation is interrupted, how can tryAppend fail?
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if(last ! =null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false.false);
}
return null;
}
Copy the code
The tryAppend method retrieves the most recent ProducerBatch from the Deque and returns null indicating failure. That is, if the first message is sent, the append will fail if the previous batch of fetched messages are sent there
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
/ / try again
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if(appendResult ! =null) {
return appendResult;
}
// Create a batch record
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true.false); }}finally {
if(buffer ! =null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
Copy the code
If abortOnNewBatch is set to false, it enters the process of creating ProducerBatch and then append the message, and adding Batch to the deque
AppendsInProgress is used to indicate whether the current append operation is running ++
The accumulator’s append operation simply adds messages to ProducerBatch, which forms a Deque chain
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
ProducerBatch batch = deque.peekFirst();
if(batch ! =null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
if (leader == null) {
// Join the unknown leader set
unknownLeaderTopics.add(part.topic());
} else if(! readyNodes.contains(leader) && ! isMuted(part, nowMs)) {long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
The accumulator is out of memory, and the thread is blocking waiting for data to have been closed
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if(sendable && ! backingOff) { readyNodes.add(leader); }else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Next check time
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
Copy the code
If yes, add the nodes that can send data to readyNodes as follows:
- Iterate through each topic partition information to find the leader node for that partition, because only the Leader node is used to process send requests
- Data can be sent when any of the following conditions occur
- Batch is full of
- There is enough time to wait for the news
- The accumulator is out of memory
- The accumulator has been turned off
- Returns node information that can be sent, next check time, and unknown leader node status
The ready method is used to determine whether stored messages can be sent
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
// Filter the node
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
Copy the code
Since partitions are distributed on different nodes, the drain method separates messages sent to the same node
The drainBatchesForOneNode logic is: Get the current node partition information, check the Map Deque, and return the prepared ProducerBatch according to some criteria such as whether the message exceeds maxSize
void abortBatches(final RuntimeException reason) {
for (ProducerBatch batch : incomplete.copyAll()) {
Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
synchronized(dq) { batch.abortRecordAppends(); dq.remove(batch); } batch.abort(reason); deallocate(batch); }}Copy the code
Messages that have already been submitted to the accumulator can be broken, deleting the batch and triggering the onCompletion callback function
Summary of accumulator: Accumulator is the staging point for sending messages. The submitted messages will be stored as the Key of Map according to “Theme-partition”. Ready determines whether the message should be sent. If the message can be sent, call the drain method to sort it by the node from which it was sent. If something goes wrong, can the message be interrupted
Partitioner
public interface Partitioner extends Configurable.Closeable {
// Compute partitions
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close(a);
// Start a new Batch
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}}Copy the code
The Partitioner is a partition computation interface that determines which partition a message is sent to. This interface can be implemented to customize the message partitioning rules or to use the default rules.
RoundRobinPartitioner rounds partitions
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
return counter.getAndIncrement();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Partition number + 1
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if(! availablePartitions.isEmpty()) {// Find the method of residuals in the availability zone
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// There are no available partitions
returnUtils.toPositive(nextValue) % numPartitions; }}Copy the code
Round robin implementation is very simple, using the way of redundancy
DefaultPartitioner DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
Copy the code
If the keyBytes (serialized key) that you want to partition are not empty, use hash to partition. Instead, use StickyPartitionCache
++StickyPartitionCache tries to send messages to the same partition, let the partition’s accumulator fill up and send them, and then switch partitions. This reduces the hash time when sending messages at high speeds, and the partition messages become balanced as the system continues to run
Summary: This is the system’s built-in partitioning policy, if not specified manually use DefaultPartitioner to hash the partition
Sender Message sending thread
Sender is used to process the messages inside RecordAccumulator to send
public void run(a) {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e); }}...Copy the code
Sender is a Runnable, and inside the run method is a while loop that always executes the runOnce method
void runOnce(a) {
// The previous section is about transaction managers.if{maybeAbortBatches(lastError); }...long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
Copy the code
This is a batch of pseudo-code, the first half of which is transaction specific, and if there is a problem, it calls abortBatches. The sendProducerData method really handles the sent message
The SendProducerData method is as follows:
private long sendProducerData(long now) {
// The read cache is asynchronous
Cluster cluster = metadata.fetch();
// Get the ready node
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// If there are any partitions whose leaders are not aware of, force metadata updates
if(! result.unknownLeaderTopics.isEmpty()) {for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
Copy the code
Call accumulator. Ready and get the connection. Metadata is then updated for the partition that cannot find the leader
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); }}Copy the code
The second step is to test the connectivity of the prepared nodes and delete those that do not meet the requirements
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition); }}Copy the code
The third step is to use the drain method to allocate messages to nodes. If the partition ordering is enabled, mute the partition to ensure that only one batch of messages can be sent to the current partition
Step 4 Process expired messages
Ests of sendProduceRequests(Batches, NOW)
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
// The callback functionRequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds()); String nodeId = Integer.toString(destination); ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks ! =0, requestTimeoutMs, callback); client.send(clientRequest, now); }}Copy the code
Call NetworkClient’s send method, and finally call kafkaChannel’s setSend method for caching
NetworkClient
The last line of the Sender runOnce looks like this:
client.poll(pollTimeout, currentTimeMs);
Copy the code
We call NetworkClient’s poll method, and we know that Selector also has a poll method, which is used to select I/O events, perform read and write events and put things in a Set, and NetworkClient. Poll is supposed to handle those events
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if(! abortedSends.isEmpty()) {// If delivery is aborted due to unsupported version exceptions or disconnections, handle them immediately without waiting for select.poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
// Calculate the minimum time required for metadata update
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
// Process the message that has been sent successfully
handleCompletedSends(responses, updatedNow);
// Process the message that has been successfully read
handleCompletedReceives(responses, updatedNow);
// Handle disconnected nodes
handleDisconnections(responses, updatedNow);
// Handle connections
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
// Call the callback method to send a message to topic
completeResponses(responses);
return responses;
}
Copy the code
As you can see from the annotations, the node data is first retrieved, and then the selector. Poll method is executed to handle the read and write events, which are then processed. Since the producer client is primarily responsible for sending, the logic for handling read data is not too complicated
@Override
public long maybeUpdate(long now) {
// The next metadata update time is set to needUpdate immediately or calculate the metadata expiration time)
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
// If the last request to the server has not been received, set defaultRequestTimeoutMs to 30S
long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
// Return the waiting time because the new time has not arrived
if (metadataTimeout > 0) {
return metadataTimeout;
}
// Select an idle node for fetching
Node node = leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
// Send a request
return maybeUpdate(now, node);
}
Copy the code
The first is to request metadata from the server, topics, partitions, nodes, etc. Select an idle node to send the request using the leastLoadedNode method
The method eventually sends the request and calculates the wait time as the selector. Poll parameter
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
if(! request.expectResponse) {// Remove messages from inFlightRequests
this.inFlightRequests.completeLastSent(send.destination());
responses.add(request.completed(null, now)); }}}Copy the code
Process completed requests and remove them from inFlightRequests
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e); }}}Copy the code
Call the onComplete callback. For messages, this function is handleProduceResponse, as reflected in the sendProduceRequests method (see above).
OnComplete is used primarily to validate the Batch message, and ultimately to call onCompletion, which we wrote ourselves to indicate that the message has been sent to the corresponding node.
KafkaProducer
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
Copy the code
We submit the message using the send method, which triggers the OnSend method and then doSend
// Wait for metadata to update
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
// serialize key-val
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// Compute partitions
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
Copy the code
After simplifying some of the logic, the first half is divided into the above steps, and you can see that the serializer we created and the partition (not specified to use the default) are called here.
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
Cluster cluster = metadata.fetch();
metadata.add(topic);
// Get the number of topic partitions
Integer partitionsCount = cluster.partitionCountForTopic(topic);
Return if no partition is specified but partitions exist or partitions are in a known range
if(partitionsCount ! =null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
int version = metadata.requestUpdate();
// Wake up the sender to send metadata to the remote server to send the request
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
Copy the code
The NetworkClient. Poll method will send the metadata request, and wait some time for another thread to fetch the new metadata
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true);
// If there is no Batch in the Deque, this step is entered to indicate that the last Batch of messages has been sent or created for the first time
if (result.abortForNewBatch) {
int prevPartition = partition;
// The StickyPartitionCache logic needs to be replaced
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
}
if(transactionManager ! =null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
Copy the code
Then call accumulator.append method, if the first call would be unsuccessful, did not create Batch. This is compatible with StickyPartition logic, which re-selects partitions and creates batch messages
KafkaProducer ends there and the Sender thread is left to do the work
conclusion
This article introduces the message sending process. Since we call Send, the message is sent to the accumulator. The Sender thread keeps working, taking the message from the accumulator and sending it to KafkaChannel, triggering Selector.