Kafka cluster architecture principle, can let us more clearly recognize the face of Kafka, to avoid a sneak peek. In today’s article we will analyze the core module of Kafka — the producer.

Overview of Kafka producer processes

Kafka producers wrap messages into ProducerRecord and send messages to a topic in a Kafka cluster.

2. The sent message is first serialized by the serializer for transmission in the network;

3. The sent message needs to go through the partition to determine whether the message will be distributed to the corresponding partition of the topic. Of course, if the partition is specified, then the partition is not needed.

At this point messages leave the producer and start to be sent to topics and partitions specified in the Kafka cluster.

5. If the write succeeds, the kafka cluster responds with a RecordMetaData message to the producer. If the write fails, the producer is retried according to the configured number of failed attempts.

Kafka producer detailed process

Step 1: An incoming message is first encapsulated as a ProducerRecord object.

Step 2: Serialize this object. Because Kafka messages need to be transmitted from the client to the server, which involves network transport, you need to implement the sequence. Kafka provides a default serialization mechanism, as well as support for custom serialization (this design is also worth building for more extensibility).

Step 3: After the message is serialized, partition the message and obtain the metadata of the cluster. This partitioning process is critical, because at this point it determines which topic and which partition our message will be sent to on the Kafka server.

Step 4: The partitioned message is not sent directly to the server, but is placed in a cache of the producer. In this cache, multiple messages are encapsulated into a batch, with a default batch size of 16K.

The Sender thread starts and retrieves batches from the cache to send.

Step 6: The Sender thread sends batches one by one to the server. Before Kafka0.8, Kafka producers were designed to send a single piece of data to the server. Frequent network requests occurred, resulting in poor performance. In the later version, when the architecture is evolving again, the mode is changed to batch processing. The performance is improved exponentially. This design is worth accumulating. In-depth analysis of producer details.

Analysis of difficulties in Kafka producers

ProducerRecord

Before a producer needs to send a message to the cluster, it wraps each message into a ProducerRecord object, which is done internally by the producer.

The serializer

When creating ProducerRecord, you must specify a serializer. It is recommended to use serialization frameworks such as Avro, Thrift, ProtoBuf, etc. in addition to the serializer provided by default. Creating your own serializer is not recommended. If changes are needed, there are varying degrees of problems in maintaining compatibility between old and new message code.

Apache Avro

Apache Avro is a data serialization system that supports rich data structures and provides a compact, fast, binary data format. Before using Avro, you need to define schemas, which are usually written using JSON.

(1) Create a class to represent the customer as the value of the message

class Custom { private int customID; private String customerName; public Custom(int customID, String customerName) { super(); this.customID = customID; this.customerName = customerName; } public int getCustomID() { return customID; } public String getCustomerName() { return customerName; }}Copy the code

(2) Define the schema

{  
  "namespace": "customerManagement.avro",  
   "type": "record",  
   "name": "Customer",  
   "fields":[  
       {  
          "name": "id", "type": "string"  
       },  
       {  
          "name": "name",  "type": "string"  
       },  
   ]  
}
Copy the code

(3) Generate Avro objects and send them to Kafka. When reading messages with Avro, the entire schema needs to be read first. To achieve this, a Schema called Schema Registry is used.

Properties props = new Properties(); props.put("bootstrap", "loacalhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", schemaUrl); //schema.registry. Url points to the storage location of the ejema. String Topic = "CustomerContacts"; Producer<String, Customer> produer = new KafkaProducer<String, Customer>(props); / / continuously generated message and send the while (true) {Customer Customer = CustomerGenerator. GetNext (); ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer); producer.send(record); // Send customer as the value of the message and KafkaAvroSerializer will take care of the rest}Copy the code

partition

When the key is empty and the default partition is used, messages are randomly sent to one of the available partitions of a given topic. The round-robin algorithm is used to balance messages between partitions.

When the key is not empty and the default partition is used, Kafka computes the hash value of the key and uses the hash value to map the message to the specific partition. It is important to always map a key to the same partition.

As long as the number of partitions for a topic is constant, the mapping between keys and partitions is guaranteed to be consistent. However, if you add a new partition to a topic, there is no guarantee that new messages with the same key will be written to the original partition, although existing data will remain in the original partition. Therefore, it is best to define the number of partitions required in advance when creating a topic to avoid inconsistent mapping when adding new partitions later.

The buffer

After a message is partitioned, the message is first put into a cache. Let’s take a look at the details. The default size of a batch is 32 MB. This block contains an important data structure called ‘key-value’ within it. Key is the partition of the message subject, and value is a queue that holds batches sent to the corresponding partition.

The producer stores the batch information as an object of batches. If it's you, what data structure are you thinking about to store batch information? Kafka takes this approach by customizing a data structure: CopyOnWriteMap. For those of you who are familiar with Java, there is a CopyOnWriteArrayList data structure underneath JUC, but there is no CopyOnWriteMap. Let me explain why Kafka designed such a data structure. 1. They store information in the key-value structure. Key is a partition, and value is the corresponding batch to be stored in this partition (there may be multiple batches, so the queue is used). 2. The Kafka producer is faced with a high concurrency scenario where a large number of messages flood into the data structure, so the data structure needs to be thread safe so that we can't use data structures like HashMap. 3. The data structure needs to support the scenario of read more than write less. Read batches because each label reads a value based on the key. If there are 10 million batches of messages, then an Batches object is read 10 million times. Just to be clear, we're just writing 50 key-value numbers in this fandom. Just to be clear, of course, we're writing 10 million batches. However, these numbers are written to batches in a queue, not just a batch, so in this case, just 50 numbers of batches are required.) We concluded from the second and third scenarios that Kafka needs a thread-safe Map data structure that supports read more and write less. But Java does not provide such a data, the only close to this requirement is CopyOnWriteArrayList, but it is not a Map structure, So Kafka here mimics CopyOnWriteArrayList by designing CopyOnWriteMap. The idea of read-write separation is used to solve the problems of thread safety and support more read and less write. Efficient data structures ensure producer performance. (CopyOnWriteArrayList unfamiliar students can try Baidu learning). Here I suggest you can look at the source code of Kafka producers inserting data into a fandom. The producer uses multiple threads in order to ensure the high performance of the inserted data, and in order to thread safety, the use of segmentation locking and other means, the source code is very exciting.Copy the code
The default size of batches is 16K, and the size of the entire cache is 32M. The manufacturer needs to allocate memory for each batch that is sealed. Normally, when a batch is sent out, This 16K memory is just waiting to be reclaimed by GC. Therefore, a memory pool (similar to the connection pool of the database we usually use) is designed in the cache. When a 16K memory is used up, the data is emptied and put into the memory pool. You can get it directly from it for the next batch. This greatly reduces the frequency of GC, ensuring stable and efficient producers (Java GC is a headache, so this design is well worth the accumulation).Copy the code

Sender

After putting the message into the buffer, at the same time a separate thread Sender sends the Batch to the corresponding host.

Producer code

//1. Set parameter Properties Properties = new Properties(); The properties. The put (" the bootstrap. The servers, "" 120.27.233.226:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("acks", "-1"); properties.put("retries", 3); properties.put("batch.size", 16384); properties.put("linger.ms", 10); properties.put("buffer.memory", 33554432); properties.put("max.block.ms", 3000); properties.put("max.request.size", 1048576); properties.put("request.timeout.ms", 30000);Copy the code
// create a socket Producer<String, String> Producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "This is Message " + i; // create message ProducerRecord<String, String> record = new ProducerRecord("test_topic", "test", MSG); //4. Producer. Send (record); System.out.println("Sent:" + msg); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } finally {//5. Close the connection producer.close(); }Copy the code

About Message Sending

3.1 the Fire – and – forget

After sending a message, you do not need to care whether the message is successfully sent. Because Kafka is highly available and the producer automatically resends, most of the time it succeeds, but sometimes it fails.

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry",
		"Precision Products", "France");
try {
	producer.send(record);
} catch (Exception e) {
	e.printStackTrace();
}
Copy the code

Exceptions may occur before the message is sent. For example, SerializationException where the message failed to be serialized, Bufferhaustedexception where the buffer is full, Send TimeoutException that times out or InterruptException that the sending thread is interrupted.

3.2 Synchronous send

Call send() and return a Future object. Call get() and wait until the result is returned.

Simply replace the line in the try above with the following:

producer.send(record).get();
Copy the code

Call get() after send() and wait for the result to return. An exception is thrown on failure, a RecordMetadata object is returned on success, and the offset() method can then be called to get the offset of the message in the current partition.

KafkaProducer has two types of exceptions. The first is a Retriable exception that can be resolved by resending a message. For example, reconnect after the connection is abnormal, or select a new leader after the No leader is abnormal. KafkaProducer can be configured to automatically resend messages when such an exception occurs until the number of retries exceeds. The second type is not retried, such as “Message Size too Large”, which returns an error immediately.

3.3 Asynchronous send

Send asynchronously, specifying a callback function when the send() method is called. The callback function is triggered when the broker receives a return.

class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e ! = null) { e.printStackTrace(); } } } producer.send(record, new DemoProducerCallback());Copy the code

To use a callback function, to realize the org. Apache. Kafka. Clients. Producer. The callback interface, this interface is only one onCompletion method. If an Exception is sent, the parameter Exception E of onCompletion will be non-empty.

Exceptions in Kafka

1) LeaderNotAvailableException: if this is a machine hang up and a copy of the leader is not available at this time, may cause you to write a failure. To continue writing, wait until other follower replicas switch to the leader replicas. In this case, you can retry sending. If you restart the broker process of kafka at ordinary times, will surely lead to switch leader, will quote LeaderNotAvailableException anomalies.

2) NotControllerException: If the Controller Broker is down, there will be a problem and the Controller needs to be reelected

NetworkException We configured a parameter, retries, that automatically retries, but if it fails after a few tries, an Exception is provided.

Code parameters tuning

① Acks message authentication
acks Check whether the message is successfully sent
– 1 Leader & All follower receive
1 Leader to receive
0 Send the message
② Retries Number of retries (Major)
props.put("retries", 3);
Copy the code

There are all kinds of exceptions that can be encountered in Kafka, especially when the network suddenly fails, but the cluster cannot throw every exception because the network can be restored the next second, so we need to set up a retry mechanism.

③ Batch size
props.put("batch.size", 32384);
Copy the code

The batch size is set to 16K by default, but 32K is used here. Setting a larger batch size can improve throughput slightly. Setting the batch size also depends on the size of the message.

④ Linger. Ms transmission time limit
props.put("linger.ms", 100);
Copy the code

For example, if I set the batch size to 32K and one message is 2K, three messages have been sent, with a total size of 6K, and no messages have been sent from the producer side. Should I not send to the cluster when 32K is not enough? Obviously not, linger.ms is to set a fixed time, even if the Batch is not filled, it will be sent. I set 100 milliseconds above, so even if my Batch does not reach 32K, the Batch will be sent to the cluster after 100 milliseconds.

⑤ Buffer. memory Buffer size
props.put("buffer.memory", 33554432);
Copy the code

When our Sender thread is very slow and we’re producing data very fast, if we don’t have enough buffer in the middle, the producer can’t produce any more data, so we need to make the buffer a little bigger. The default size of the buffer is 32 MB, which is basically reasonable.

⑥ max-request. size Indicates the maximum message size
props.put("max.request.size", 1048576);    
Copy the code

Max. Request. Size: This parameter is used to control the size of the sent message, the default is 1048576 bytes, is 1 m, the general is too small, a lot of news could be more than 1 MB size, so the need to optimize adjustment, set a larger (enterprises generally set to 10 m), otherwise the program runs well suddenly to a 2 m of the message, The system gives you an error, and you lose more than you gain

⑦ Request.timeout. ms The request has timed out
props.put("request.timeout.ms", 30000); 
Copy the code

Request. A timeout. Ms: A TimeoutException will be thrown if no response is received after 30 seconds. If the company network is not good, adjust this parameter.