Welcome to github.com/hsfxuebao/j… , hope to help you, if you feel ok, please click on the Star

This article focuses on the core flow of Producer initialization.

Producer Sending process

Overall flow chart

Producer send method

Producer sends data using the send() method

@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; This method does not throw exceptions // Use ProducerInterceptor to intercept the message or modify ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); Return doSend(interceptedRecord, callback); }Copy the code

The final implementation of data sending calls the Producer’s doSend() interface.

Producer’s doSend method

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; Try {// first make sure the metadata for the topic is available /** * Step 1: * Wait for the metadata to be fetched. * maxBlockTimeMs Maximum waiting time. */ ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; /** * Serialize the key and value of the message. */ byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } /** * Step three: * Select the partition to which the message should be sent according to the partition partition. * * Now that we have the metadata, we can calculate which partition to send the data to. */ int partition = partition(record, serializedKey, serializedValue, cluster); /** ** Calculates the total size of the message record * record.log_overhead = SIZE_LENGTH (value 4) + OFFSET_LENGTH (value 8) * Records.LOG_OVERHEAD has two fields, SIZE_LENGTH and OFFSET_LENGTH, */ int serializedSize = records.log_overhead + records.recordSize (serializedKey, serializedKey) serializedValue); /** * Step 4: * Verify that the message size exceeds the maximum value. When KafkaProdcuer is initialized, it specifies a parameter that represents the maximum size of a message sent by Producer. The default maximum is 1M, which we usually change. */ ensureValidRecordSize(serializedSize); */ tp = new TopicPartition(record.topic(), partition); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // Producer callback will make sure to call both 'callback' and interceptor callback /** * Because we are sending messages asynchronously. */ Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); /* Step 7: Put the message into the Accumulator (32M of memory). */ RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); / / if meet the requirements of batch if (result. BatchIsFull | | result. NewBatchCreated) {the trace (" wakingup the sender since topic {} partition  {} is either full or getting a new batch", record.topic(), partition); /** * Wake up the sender thread. He's the thread that actually sends the data. */ this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback ! = null) callback.onCompletion(null, e); this.errors.record(); if (this.interceptors ! = null) this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); if (this.interceptors ! = null) this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); if (this.interceptors ! = null) this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); if (this.interceptors ! = null) this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method if (this.interceptors ! = null) this.interceptors.onSendError(record, tp, e); throw e; }}Copy the code

The general process of doSend() is divided into eight steps as follows:

  1. Verify that the metadata of the topic to which data is to be sent is available (if the partition leader exists, it is available; if the permission is enabled, the client has the corresponding permission). If the metadata of the topic is not available, You need to get the corresponding metadata;
  2. Serialize the key and value of record;
  3. Get the partition to which the record is to be sent (can be specified or calculated according to the algorithm);
  4. Verify that the message size exceeds the maximum (1M by default);
  5. According to the metadata information, the partition object is encapsulated.
  6. Bind a callback function to each message;
  7. Append record data to Accumulator. The data will be cached first (default 32M).
  8. If the corresponding RecordBatch has reached the size of Batch. size (or the remaining batch space is not enough to add the next Record), wake upsenderThe thread sends data.

Data transmission is divided into eight steps above, and message analysis is carried out in several parts below.

Detailed explanation of sending process

Synchronizing block to obtain metadata information

If you want to send a message to a topic, you must have metadata for that topic. You must know which partitions the topic has, and then select a partition according to the Partitioner component, and then know which broker the leader belongs to. To establish a connection to the broker and send a message

If the client has not cached the metadata for that topic, then it must send a network request to the broker to fetch the metadata for that topic. Next time, it can directly send the metadata according to the cached metadata. There’s an article later on that goes into more detail.

Serialize the key and value of the message

The key and value used to send messages can be of various types, such as String, Double, Boolean, or custom objects, but to send messages to the broker, the key and value must be serialized. Convert those types of data to an array of bytes []. Internally kafka provides serialization and deserialization as follows:

If these serializations do not meet our requirements, we can customize the serialization and deserialization.

Obtain the partition value

The calculation of partition value can be divided into three cases:

  1. When partition is specified, the specified value is directly used as the Partiton value;
  2. If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value.
  3. When there is neither a partition value nor a key value, the first call randomly generates an integer (incremented by each subsequent call), which is mod to the total number of partitions available to topic to get the partition valueround-robinAlgorithm.

The concrete implementation is as follows:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster Cluster) {// If you have a partition number assigned to the message, you can use the partition number. Integer partition = record.partition(); return partition ! = null ? partition : Partitioner. Partition (record.topic(), record.key(), serializedKey, Record.value (), serializedValue, cluster); }Copy the code

Producer default partitioner is org. Apache. Kafka. Clients. Producer. The internals. DefaultPartitioner, users can customize partition strategy, Here are two concrete implementations of this class’s methods:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster Cluster) {/ / for Cluster partition information of a topic in a List < PartitionInfo > partitions. = Cluster partitionsForTopic (topic); int numPartitions = partitions.size(); If (keyBytes = = null) {/ / strategy one: if sending a message, do not specify a key polling / / retain and on the counter, counter is an atomic classes int nextValue = nextValue (topic); / / to get available partition List < PartitionInfo > availablePartitions = cluster. AvailablePartitionsForTopic (topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else {// No partitions are available, give a non-available partition, Return utils.topositive (nextValue) % numPartitions; }} else {/** Hash the key * hash the keyBytes to choose a partition * hash the key * mod the total number of partitions * If it is the same key, the calculated partition must be the same partition. * If we want messages to be sent to the same partition, we must specify key. Murmur2 is an efficient, low-collision Hash algorithm */ return utils.topositive (utils.murmur2 (keyBytes)) % numPartitions; }}Copy the code

Verify whether the message size exceeds the maximum value

Private void ensureValidRecordSize(int size) {// If a message is larger than 1M, If (size > this.maxRequestSize) throw new RecordTooLargeException("The message is "+ size +" bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); If one of your messages is larger than 32 MB, an error will be reported. if (size > this.totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); }Copy the code

The maxRequestSize and totalMemorySize parameters are configurable

Encapsulates TopicPartition objects

TopicPartition encapsulates TopicPartition objects based on partitions.

Set the callback function

Set up the custom callback function and the corresponding interceptor’s callback function

Write data to Accumulator

public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do // Count the number of threads appending data to RecordAccumulator appendsInProgress.incrementAndGet(); Try {// check if we have an in-progress batch /** * * If there's an existing queue, then we'll use an existing queue * if there's no queue, then we'll create a new queue * * We definitely have a queue to store batches, but one thing you need to know * the first time we execute our code here, we get an empty queue. * * Now the code is executed for the second time. * Assume that the partition is the same as before. Kafka encapsulates its own data structure: CopyOnWriteMap (this data structure is inherently thread-safe) */ Deque<RecordBatch> dq = getOrCreateDeque(tp); ** * Suppose we now have thread one, thread two, Synchronized (dq) {if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); /** * Step 2: * Trying to add data to a batch in a queue * * Initially adding data will fail, we only have queues * Data needs to be stored in batch objects (batches that need to be allocated memory) * We haven't allocated memory yet, so if we are scenariodriven, * The first run of the code here was unsuccessful. */ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); AppendResult is null if (appendResult! = null) return appendResult; }// We don't have an in-progress record try to allocate a new batch /** * Calculate the size of a batch * Take a maximum value between the size of the message and the size of the batch and use this value as the size of the current batch. * It is possible that the size of one of our messages is larger than the size of a set batch. * The default batch size is 16K. * So when we see this code, it should give us a hint. * If the number of messages sent by the producer is greater than 16K, * means that a message is a batch, which means that the message is sent one by one. * If this is the case, the concept of batch design is meaningless * so you must set the batch size according to the data size of your own company. */ int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); /** * Step 4: allocate memory according to the size of the batch ** Thread 1, thread 2, thread 3, all of the execution up to this point will allocate memory * assuming each thread has allocated 16K memory. */ ByteBuffer buffer = free.allocate(size, maxTimeToBlock); Synchronized (dq) {// assume that the thread enters. Need to check if producer is closed again after grabbing the dequeue lock. If (open) throw new IllegalStateException("Cannot send after the producer is closed."); /** * Step 5: * Try to write data to the batch. * The code failed the first time (appendResult==null) * The current batch has not been created but memory has been allocated. When thread two comes in and executes this code, it succeeds. */ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); AppendResult = null if (appendResult! = null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... // Thread 2 gets here and has already written to the batch. So // his memory is no longer useful, so he frees it. free.deallocate(buffer); return appendResult; } /** * Step 6: * Encapsulate a batch based on memory size ** Thread will encapsulate a batch based on memory as soon as it gets here. */ MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize); RecordBatch RecordBatch = new RecordBatch(TP, recordsBuilder, time.milliseconds()); // Try to write data to this batch, at this point our code will execute successfully. // The thread writes data to the batch. FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); Dq.addlast (batch); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); Finally}} {/ / will record is additional information on the number of threads of counter 1 appendsInProgress. DecrementAndGet (); }}Copy the code

First, let’s look at the getOrCreateDeque(TP) method

Private Deque<RecordBatch> getOrCreateDeque(TopicPartition TP) {private Deque<RecordBatch> D = obtains the storage queue corresponding to the current batches directly from the BATCHES this.batches.get(tp); if (d ! = null) return d; d = new ArrayDeque<>(); Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; }Copy the code

A thread can concurrently execute putIfAbsent (), which is thread-safe unless the queue does not exist. Synchronized () ensures that only one thread updates the value at a time. CopyOnWriteMap<>() is a self-styled concurrent thread-safe data structure using the following specifications:

/** * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification * * * 1) This data structure is thread-safe in case of high concurrency. * * 2) Data structures designed with read-write separation * * open up new memory space every time data is inserted * * So there is a small disadvantage, is that when data is inserted, it consumes more memory space. * * 3) Such a data structure is suitable for the scenario of less write and more read. * * High performance when reading data. * * * * batchs is the data structure used when the object stores data. * * In the case of an Batches product, the application is just an overly read/under-write scenario. * * * * Batches: * * Data is read from an batches label each time a message is produced. * * If 100,000 messages are produced per second, does that mean 100,000 reads per second? * * So definitely a high concurrency scenario. * * Write data: * * If there are 100 partitions, the data will be inserted 100 times. * * And the queue only needs to be inserted once. * * So this is a low frequency operation. */ public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {/** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** * * In the case of multiple threads, if the value of this map changes, other threads are also visible. * * get * put */ private volatile Map<K, V> map; . Omit some code /** * No lock, high performance when reading data (high concurrency scenarios, definitely high performance) * and thread safe. * Because of the idea of reading and writing separation. * @param k * @return */ @Override public V get(Object k) { return map.get(k); } /** * 1): synchronized * Even with locks, this code still performs well because it is all pure memory operations. * 2) * This design method adopts the design idea of reading and writing separation. * Read and write operations do not affect each other. * So our operation of reading data is thread-safe. * 3) * Finally, the value is assigned to map, which is volatile. * Indicates that the map is visible, so that if the value changes when getting data, it is also aware of it. */ @override public synchronized V put(K K, V V) {// New memory space Map<K, V> copy = new HashMap<K, V>(this.map); V prev = copy. Put (k, V); this.map = Collections.unmodifiableMap(copy); return prev; }}Copy the code

(2) The whole process adopts the design of segmental locking to improve the concurrent ability

(3) Use memory pool to design BufferPool

How is a message written to the underlying ByteBuffer according to the binary protocol specification

  • offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value
  • It is in strict accordance with the specification of binary protocol. It is stipulated in the specification that the byte offset is first several bytes, then several bytes of size, then several bytes of CRC, then several bytes of magic, and so on. It is written into ByteBuffer in full accordance with the specification
  • You can see how the lowest level of the IO stream is written to ByteBuffer
  • ByteBufferOutputStream wraps ByteBuffer, holds an output stream for ByteBuffer, and then wraps ByteBufferOutputStream in a compressed stream, gzip, LZ4, snappy, If the data is wrapped in the compressed stream, the data is first written into the buffer of the compressed stream
  • The compressed stream puts a message in a buffer, compresses it with a compression algorithm, and writes it to the underlying ByteBufferOutputStream
  • In non-compressed mode, the most common case is that DataOutputStream wraps ByteBufferOutputSteram and writes data, Long, Byte, and String, to ByteBuffer at the bottom

While memory can be recycled, 🈶 can reduce the number of GC. The corresponding flow chart is:

Wake up the Sender thread

Wake up the sender thread, which is basically responsible for sending data. We will analyze it in detail later.

The last

This article mainly explores the main process of sending data from Producer, including segmental locking, memory pool design and concurrent safe data structure CopyOnWriteMap<>(), which is worth our thinking and learning. In the next installment, we will cover metadata update operations.

Reference Documents:

Kafka 0.10.2.0-src kafka 0.10.2.0-src kafka 0.10.2.0-src

Kafka technology insider – Graphic details kafka source code design and implementation

Kafka source code analysis series