preface

The Kafka producer client runs in coordination with two threads: the main thread and the sender thread. Messages are created in the main thread by kafkaProducer and then processed by different modules of interceptors, serializers, and partiers before being cached in the message accumulator. So how do interceptors, serializers, and partitioning work

The interceptor

If you have used Spring Interceptor, you must be familiar with the concept of interceptors. Kafka has two types of interceptors: producer interceptors and consumer interceptors. Producer interceptors have two functions. They can be used to pre-process messages, such as adding prefixes to messages. It can also be used to do some customization, such as counting the success rate, before sending the callback logic.

How do you use producer interceptors? First, we need to realize the org. Apache. Kafka. Clients. The producer Interface ProducerInterceptor Interface, and then through the parameters of the interceptor. Classes to configure, Just tell the Producer client that we have added interceptors. The Interface ProducerInterceptor Interface has three methods:

public interface ProducerInterceptor<K.Vextends Configurable {
    // This method is called before the message is sent
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    // This method is called after the message has been successfully submitted or failed to be sent
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    // Close the Interceptor to perform some resource cleaning
    void close(a);
}
Copy the code
onSend()

This method is called before KafaProducer serializes the message and computes the partition. You can change the message content, but it is generally not recommended to change the topic, key, partition, etc of ProducerRecord. In addition, if you change the key/value here, the key/value returned to the partition is the changed value, not the original key/value issued by the client. Changing the key not only affects partition computing, but also causes Log Compaction to occur on the broker server.

Implement custom ProducerInterceptorImpl, need configuration parameters in KafkaProducer ProducerConfig. INTERCEPTOR_CLASSES_CONFIG specifies the interceptor. If not specified, the default value is null, meaning there is no interceptor function.

interceptor.classes

A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.

    ProducerInterceptorPrefix.class.getName());

Copy the code

KafkaProducer supports multiple interceptors to form a chain of interceptors. If multiple interceptors are configured, the interceptor chain is executed in the order in which the interceptors are configured by the interceptor.classes parameter. The next interceptor will continue to process the input of the previous interceptor as input, but the exception error output of the previous interceptor will not cause the exception or block of the next interceptor. If the previous interceptor fails, the next interceptor picks up from the result of the previous successful interceptor.

onAcknowledgement()

This method is called when a message is successfully answered or failed to be sent, and is executed before the user-specified Callback method. This method usually runs in the I/O thread of the Producer, so the code implementation of this method needs to be fast enough, otherwise it will affect the speed of sending messages.

close()

This method is primarily used to perform some resource cleanup when the interceptor is shut down. Exceptions thrown in all three methods are caught and logged, but are not passed up.

The serializer

The producer needs to serialize the message object into bytes before sending it over the network to the Kafka server, and the consumer needs to use deserializers to convert the received bytes into the corresponding message object. The serializer used by the producer and the deserializer used by the consumer must be in one-to-one correspondence. If the producer uses a certain serializer, such as StringSerializer, while the consumer uses another serializer, such as IntegerSerializer, the desired data cannot be parsed.

Kafka clients come with several types of serializers, including: Byte, Double, Integer, Long, String, etc., they all realized the org.apache.kafka.com mon. Serialization Interface Serializer Interface. This interface has three methods:

public interface Serializer<Textends java.io.Closeable {

    // Used to configure the current class encoding type

    public void configure(Map<String, ? > configs,boolean isKey);

    // To perform serialization

    public byte[] serialize(String topic, T data);

    // To close the current serializer

    public void close(a);

}

Copy the code

If the kafka client provides several serializers that are not sufficient for your application, you can also choose to use serializers such as JSON, ProtoBuf, and Protostuff, or use a custom type (it is not recommended to customize your own serializers).

Partition is

KafkaProducer sends messages to the broker server by calling the Send () method, first through the Interceptor (not required), Serializer (required), and then to the Partitioner. If the ProducerRecord (ProducerRecord) contains a partition field, there is no need for the partition to process the message, because the partition represents the number of the partition to be sent. If not specified, a partition is required for allocation.

Kafka provide the default partition is org. Apache. Kafka. Clients. Producer. The internals. DefaultPartitioner, It implements the org. Apache. Kafka. Clients. The producer Interface Partitioner Interface. This interface has three methods:

public interface Partitioner extends Configurable.Closeable {

    // Calculate the partition number

    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    // indicates the notification partition program used to create a new batch.

    void close(a);

    // indicates the notification partition program used to create a new batch

    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {

    }

}

Copy the code
partition()

This method calculates the partition number and returns an int. The parameters are subject, key, serialized key, value, serialized value, and metadata information for the cluster. If the key is not empty, then the default partition hashes the key (using MurmurHash2 algorithm, which has high performance and low collision rate), and finally calculates the partition number based on the resulting hash value. Messages with the same key are written to the same partition. If the key is empty, the message will be sent to the various availability zones within the topic in a polling manner.

Note: If key is not empty, then the partition number is calculated based on all partitions, to select one of them; If the key is empty, then the partition number is calculated based on all availability zones to select any one. Notice the difference between the two.

Without changing the number of subject partitions, the mapping between key and partitions remains the same. However, once a partition is added to the topic, it becomes difficult to ensure that the key maps to the partition. Of course, in addition to using the default partition provided by Kafka for partition allocation, you can also use a custom partition by implementing the Partitioner interface.

conclusion

So far, we have seen the definition and functions of interceptors, serializers, and partitioners under the main thread, and that they all support custom functionality implementations. Next, we can start to practice ~

Code word is not easy, work together!! Your likes are what keeps me going