Last article we mainly introduced what Kafka is, Kafka’s basic concept is what, Kafka standalone and cluster version of the build, as well as the basic configuration files are roughly introduced, but also Kafka several main roles are described, we know, Whether Kafka is used as a message queue, a message bus, or a data storage platform, the word message ultimately fails to get around. This is the core of Kafka. Where do Kafka’s messages come from? Where to? What did you do? Take it one step at a time. Let’s talk about where Kafka came from.

Producer Overview

In Kafka, we refer to the party that generates the message as the producer. For example, we often go to Taobao to shop. The moment you open Taobao, your login information and login times will be transmitted to Kafka background as a message. Your shopping preferences are sent to Kafka as messages, and Taobao makes smart recommendations based on your preferences, so your wallet is never tempted. How do these messages get to the Kafka app? What’s the sending process like?

Although the generation of the message is very simple, the sending process of the message is quite complicated, as shown in the figure

ProducerRecord is a core class in Kafka. It represents a set of key/value pairs that Kafka needs to send. It records the Topic Name to which it is sent. An optional Partition Number and an optional key-value pair.

When sending ProducerRecord, we need to convert key-value objects from the serializer to byte arrays so that they can be transferred over the network. The message then arrives at the divider.

If a valid partition number is specified during the send, it will be used when the record is sent. If no partition is specified during the send, a partition is specified using the hash function mapping of the key. If neither a partition number nor a partition number is sent, a partition is allocated in a circular fashion. Once the partition is selected, the producer knows which topic and partition to send data to.

ProducerRecord also has an associated timestamp, and if the user does not provide a timestamp, the producer will use the current time as a timestamp in the record. The timestamp ultimately used by Kafka depends on the timestamp type configured for the topic.

  • If the topic is configured to useCreateTime, the timestamp in the producer record will be used by the broker.
  • If the topic is configured to useLogAppendTime, the timestamp in the producer record will be overridden by the broker as the message is added to its log.

This message is then stored in a record batch, and all messages in this batch are sent to the same topic and partition. A separate thread is responsible for sending them to the Kafka Broker.

When Kafka Broker receives a message, it returns a response. If the write is successful, it returns a RecordMetaData object containing the subject and partition information, as well as the offset recorded in the partition. Both timestamp types are also returned to the user. If the write fails, an error is returned. The producer tries to resend the message after receiving an error and returns an error message after several attempts if it still fails.

Create a Kafka producer

To write messages to Kafka, you first need to create a producer object and set some properties. Kafka producers have three mandatory attributes

  • bootstrap.servers

This property specifies the broker’s address list in the form host:port. The list does not need to contain all broker addresses. Producers will look up other broker information from a given broker. However, it is recommended to provide at least two broker information so that producers can still connect to the cluster if one of them goes down.

  • key.serializer

The broker needs to receive serialized key/value values, so messages sent by producers need to be serialized before being delivered to the Kafka Broker. Producers need to know how to convert Java objects into byte arrays. Key. The serializer must be set to an implementation of a org.apache.kafka.com mon. Serialization. The serializer interface classes, producers can use this class key object serialization as a byte array. Here’s an extension of the Serializer class

Serializer is an interface that indicates how the class will be serialized. Its purpose is to convert objects into bytes. Classes that implement the Serializer interface include ByteArraySerializer, StringSerializer, and IntegerSerializer. ByteArraySerialize is the default Serializer for Kafka. There are many other serializers out there, and you can check them out here. One thing to note: Key. serializer must be set, even if you intend to send only the value of the content.

  • value.serializer

As with key.serializer, the class specified by value.serializer serializes the value.

The following code demonstrates how to create a Kafka producer, specifying only the necessary attributes, and using the default configuration

private Properties properties = new Properties();
properties.put("bootstrap.servers"."broker1:9092,broker2:9092");
properties.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);
Copy the code

Let’s explain this code

  • First, a Properties object is created
  • useStringSerializerThe serializer serializes key/value key-value pairs
  • Here we create a new producer object, set the appropriate type for the key value, and pass it the Properties object.

After the producer object is instantiated, you can then start sending messages in the following ways

Send directly, regardless of the result

Some messages are lost. Because Kafka is highly available, producers automatically try to retransmit. This is very similar to UDP transport layer protocol.

The synchronous

Synchronous send still uses the send() method to send the message, which returns a Future object and calls the GET () method to wait to see if the message was successfully sent.

Asynchronous send

Asynchronous sending means that we call the send() method and specify a callback function that the server calls when it returns a response.

We’ll revisit these three implementations in the next section.

Send a message to Kafka

Simple message sending

The simplest message Kafka can send is as follows:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry"."West"."France");

producer.send(record);
Copy the code

The producer send() method needs to send the ProducerRecord object as an argument. ProducerRecord has a number of constructors, which we’ll discuss next

public ProducerRecord(String topic, K key, V value) {}
Copy the code

This constructor needs to pass topic, key, and value.

After passing the corresponding parameters, the producer calls the send() method to send a message (ProducerRecord object). As you can see from the schema diagram of the producer, messages are written to buffers in the partition and then sent in batches to the Kafka Broker.

On successful sending, the send() method returns a Future(java.util.Concurrent) object of type RecordMetadata. This code does not consider the return value. So no corresponding Future object was generated, so there is no way to know if the message was sent successfully. If the information is not important or has no impact on the result, it can be sent in this way.

We can ignore errors that may occur while sending the message or on the server side, but there are other exceptions that may occur at the producer before the message is sent. These anomalies might be SerializationException fail (serialization), BufferedExhaustedException or TimeoutException (buffer is full), Or InterruptedException(meaning the sending thread is interrupted)

Synchronous message sending

The second message sending mechanism is shown below

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry"."West"."France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(the Exception e) {e.p rintStackTrace (); }Copy the code

This way of sending messages is an improvement over the above method, which calls the send() method first, then the get() method and waits for the Kafka response. If the server returns an error, the get() method throws an exception, and if no error occurs, we get the RecordMetadata object, which we can use to view the message record.

There are two types of errors when a KafkaProducer sends a message. One is a retry error, which can be resolved by resending the message. For example, a connection error can be resolved by re-establishing the connection; An ownerless error can be resolved by re-electing a leader for the partition. KafkaProducer is configured to retry automatically, and if the problem cannot be resolved after multiple retries, a retry exception is thrown. Another type of error cannot be resolved by retry, such as a message that is too large. KafkaProducer does not retry and throws an exception.

Sending messages asynchronously

The problem with sending messages synchronously is that only one message can be sent at a time. As a result, many messages cannot be sent directly, resulting in message lag and failure to maximize benefits.

For example, it takes 10ms for a message to travel back and forth between an application and a Kafka cluster. If each message is sent and the response is awaited, it takes 1 second to send 100 messages, but it takes much less time to send 100 messages asynchronously. Most of the time, Kafka will return a RecordMetadata message, but we don’t need to wait for a response.

In order to be able to handle exceptions while sending messages asynchronously, producers provide fallback support. Here is an example of a callback

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry"."Huston"."America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception ! =null){ exception.printStackTrace();; }}}Copy the code

First implement the Callback need to define an implementation of a org. Apache. Kafka. Clients. Producer. The Callback class, this interface is only one onCompletion method. If Kafka returns an error, the onCompletion method throws a non-null exception, which we’ll simply print out here. If you need more detail in production, It then passes an object for the Callback Callback when the send() method is sent.

Producer partitioning mechanism

Kafka reads and writes data in a partitioned granularity. Partitions can be distributed across multiple brokers, so that each node can write and read data independently. Moreover, new nodes can be added to increase the throughput of Kafka clusters. Load balancing is achieved by partitioning deployment across multiple brokers.

There are three ways a producer can send regardless of the result, send and return the result, and send and call back. Since messages exist in the partition of a topic, when a Producer sends a message to a topic, how do you determine which partition the message will exist in?

This is where Kafka’s partitioning mechanism comes in.

Partitioning strategies

Kafka’s partitioning policy refers to the algorithm that sends producers to which partition. Kafka provides you with a default partition policy, but it also allows you to customize the partition policy.

If you want to custom partition strategy, you need to display the configuration parameters of the producer Partitioner. The class, we can have a look at this class it is located in the org. Apache. Kafka. Clients. The producer under the package

public interface Partitioner extends Configurable.Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close(a);
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}}Copy the code

The Partitioner class has three methods, explained separately

  • Partition (): This class takes several arguments:topicIs the topic to be passed;keyRepresents the key value in the message;keyBytesRepresents the serialized key and byte array passed in the partition;valueRepresents the value of the message;valueBytesRepresents the serialized array of values in the partition;clusterIndicates the original data of the current cluster. Kafka gives you so much information that you can use it to partition the message and figure out which partition it should be sent to.
  • Close () : inheritedCloseableThe interface can implement the close() method, which is called when the partition is closed.
  • OnNewBatch (): notifies the partitioner to create a new batch

The partition() method is closely related to partition policies

Order training in rotation

Messages are evenly distributed to each partition, that is, each partition stores messages once. It looks like this

The graph above shows the rotation strategy. The rotation strategy is the default provided by Kafka Producer. If you do not use a specified rotation strategy, Kafka uses sequential rotation by default.

Training in rotation at random

In short, random rotation training is to randomly save messages to partition, as shown in the following figure

Just two lines of code are required to implement random assignment, as follows

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
Copy the code

Compute the total number of partitions for the topic and randomly return a positive integer less than it.

In essence, the random strategy also strives to evenly disperse data to each partition, but in practice, it is inferior to the polling strategy, so if the pursuit of uniform distribution of data, it is better to use the polling strategy. In fact, the random strategy is the partitioning strategy used by the producers of the old version, which has been changed to polling in the new version.

Messages are saved by key

This strategy is also called a key-ordering strategy. In Kafka, every message has its own key. Once a key is defined, you can ensure that all messages with the same key are placed in the same partition. Therefore, this policy is called the order by message key policy, as shown in the following figure

The partition method to implement this policy is equally simple, requiring only the following two lines of code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Copy the code

These partitioning policies are basic, but you can also customize them.

Producer compression mechanism

The term compression is simply the idea of swapping CPU time for disk space or I/O transfers in the hope that less CPU overhead will result in less disk usage or network I/O transfers. If you don’t already, I want you to read through the core knowledge programmers need to know about compression algorithms, and then you’ll understand what compression is all about.

What is Kafka compression

Kafka’s messages are divided into two layers: message sets and messages. A message set contains several log entries, and the log entries are where the message is really encapsulated. The underlying message log in Kafka consists of a series of message set log entries. Kafka usually does not operate directly on specific messages, but always writes at the message set level.

In Kafka, compression happens in two places: Kafka Producer and Kafka Consumer. Why compression? In plain English, messages are too big and need to be smaller to send them faster.

Compression is enabled in Kafka Producer using compression. Type

private Properties properties = new Properties();
properties.put("bootstrap.servers"."192.168.1.9:9092");
properties.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type"."gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry"."Precision Products"."France");
Copy the code

The above code shows that Producer’s compression algorithm uses GZIP

Any compression must be decompressed. The Producer uses a compression algorithm to compress the message and sends it to the server, which is then decompressed by the Consumer. Since the compression algorithm used is sent along with the key and value, the Consumer knows which compression algorithm to use.

Kafka Important parameters are configured

Kafka producers have a lot of configurable parameters. Kafka producers have a lot of configurable parameters. In the document (kafka.apache.org/documentati…

key.serializer

For the serialization of the key key, it implements the org.apache.kafka.com mon. Serialization. The Serializer interface

value.serializer

Is used to value the serialization value, implements the org.apache.kafka.com mon. Serialization. The Serializer interface

acks

The acks parameter specifies how many partitioned replicas must receive the message before the producer considers the message to have been written successfully. This parameter has a great impact on message loss

  • If acks = 0, the producer also does not know whether the message it generated was received by the server until it knows it wrote successfully. If something goes wrong on the way, the producer doesn’t know about it, and it’s confused because it doesn’t return any messages. This is similar to UDP transport layer protocol, just send, the server accepts it does not care.
  • If acks = 1, as soon as the cluster Leader receives the message, a message is returned to the producer telling it that the write was successful. If a network exception occurs during sending or the Leader is not elected, the producer will receive an error message. In this case, the producer will resend data. Because the sending of messages is also divided intosynchronousasynchronousKafka determines whether messages are sent synchronously or asynchronously in order to ensure efficient transmission. If you let the client wait for a response from the server (by callingFutureIn theget()Method), obviously adding latency, which would be fixed if the client used a callback.
  • If acks = all, the producer will receive a message from the server only if all participating nodes receive the message. However, the latency is higher than when acks =1, because we wait for more than one server node to receive the message.

buffer.memory

This parameter is used to set the size of the producer memory buffer that the producer uses to buffer messages to be sent to the server. If your application sends messages faster than they can be sent to the server, you run out of producer space. At this point, the send() method call will either block or throw an exception, depending on the setting of the block.on.buffer.null argument.

compression.type

This parameter indicates which compression algorithm is enabled by the producer. By default, messages are not compressed when sent. This parameter, which can be set to snappy, gzip, or LZ4, specifies which compression algorithm to use before messages are sent to the broker. Here is a comparison of the compression algorithms

retries

The error the producer receives from the server may be a temporary error (such as the partition cannot find the leader), in which case the value of the reteis parameter determines the number of times the producer can resend the message, at which point the producer will give up retry and return the error. By default, the producer waits 100ms between retries, and this wait parameter can be modified with retry.backoff.ms.

batch.size

When multiple messages need to be sent to the same partition, the producer puts them in the same batch. This parameter specifies the amount of memory a batch can use, in bytes. When the batch is filled, all messages in the batch are sent. However, producer Wells do not always wait for batches to be filled before being sent; any number of messages can be sent.

client.id

This parameter can be any string used by the server to identify the source of the message and is typically configured in the log

max.in.flight.requests.per.connection

This parameter specifies how many messages a producer can send before receiving a response from the server, and the higher its value, the more memory it consumes, but it also improves throughput. Setting it to 1 ensures that messages are written to the server in the order they are sent.

The timeout. Ms, request. A timeout. Ms and metadata. The fetch. The timeout. Ms

Request. A timeout. Ms specifies the producers when sending data waiting to be returned by the server response time, metadata. The fetch. The timeout. Ms specifies the producers in access to metadata (for example, who is the leader of the destination partition) when waiting for the server returns a response time. If the wait times out, the producer will either retry sending data or return an error. Timeout. ms specifies how long the broker must wait for a synchronous copy to return a message acknowledgement, matching the configuration of asks —- If no acknowledgement is received from a synchronous copy within a specified time, the broker will return an error.

max.block.ms

This parameter specifies the producer’s block time when the send() method is called or the partitionFor() method is used to get metadata. These methods block when the producer’s send buffer has been caught, or when no metadata is available. When the blocking time reaches max-block. ms, the producer throws a timeout exception.

max.request.size

This parameter is used to control the size of requests sent by producers. It can refer to the maximum size of a single message that can be sent, or the total size of all messages in a single request.

. The receive buffer, bytes and the send buffer. The bytes

Kafka is implemented based on TCP. To ensure reliable message transmission, these two parameters specify the buffer size of the TCP Socket for receiving and sending packets, respectively. If they are set to -1, the operating system defaults are used. If the producer or consumer is in a different data center than the broker, you can increase these values appropriately.

Article Reference:

The Definitive Guide to Kafka

Geek Time — Kafka Core Technology and Combat

Kafka.apache.org/documentati…

Kafka source

Blog.cloudflare.com/squeezing-t…

Github.com/facebook/zs…

Pay attention to the public account to get more quality e-books, pay attention to you know how good the resources are