Tip: The entire Kafka Client column is based on kafka-2.3.0.

This program recording

    • Overview of KafkaProducer
    • 2. KafkaProducer class diagram
    • 3. KafkaProducer simple example

Overview of KafkaProducer

KafkaProducer KafkaProducer has the following characteristics:

  • KafkaProducer is thread-safe and can be used by multiple threads.

  • KafkaProducer contains an internal cache pool that holds the ProducerRecord queue. At the same time, an I/O thread is started to send ProducerRecord objects to the Kafka cluster.

  • KafkaProducer’s API send method is asynchronous. It simply sends the ProducerRecord message to the cache, returns it immediately, and returns a Future as a result.

  • Acks KafkaProducer provides a core parameter, acks, that defines the condition for a message to be “committed”. This is the condition that the Broker agrees to commit to the client. The options are as follows:

    • 0 means that the producer does not care what happens to the message when it is processed on the Broker side. If KafkaProducer’s send method returns, it is considered successful. This is the least secure option because the broker may not receive the message at all or fail to store it.
    • All or -1 indicates that the message not only needs the Leader node to store the message, but also requires that its copy (to be exact, the node in the ISR) be stored completely before it is considered to have been committed and the message is returned to the client successfully. This is the strictest persistence guarantee, and of course the lowest performance.
    • 1 indicates that the message only needs to be written to the Leader node to return a successful submission to the client.
  • Retries Kafka is another core attribute provided at the production end. It controls the number of retries after a message fails to be sent. A value of 0 indicates that no retries are allowed.

  • Batch. size Kafka The sender maintains an unsent message cache for each partition. The memory size is specified by batch.size and the default memory size is 16K. But if there are less than 100 entries in the cache and the sending thread is idle, do I wait until the cache reaches 100 entries before sending or can I send immediately? Batch. size is the maximum number of messages that a client can send to the broker at one time.

  • Ms To improve the throughput of kafka message sending, that is, to control the behavior of the message sending thread when the cache is not full batch.size, whether to send messages immediately or wait for a certain time. If Linger. The message sending thread waits for this value before sending to the broker. This parameter increases response time but is good for throughput. Somewhat similar to the Nagle algorithm in the TCP domain.

  • Buffer. memory is used to control the total memory size of the message sender’s cache. If it exceeds this value, adding messages to the cache will be blocked, as detailed in the message sending process below.

  • Key. serializer Specifies the serialization processor for the key.

  • Serializer specifies the serialization handler for the message body.

  • Idempotence from kafka0.11 supports message delivery idempotent, which allows messages to be delivered only once, with enable.idempotence set to true. If this value is set to true, its retries will be set to integer. MAX_VALUE and acks will be set to all. To ensure message sending idempotent, any retries on the application side must be avoided, and if the message sending API returns an error, the application side should record the last successful message sent to avoid repeated messages.

Since Kafka 0.11, Kafka also supports transactional messages.

2. KafkaProducer class diagram



In Kafka, producers are defined using the interface Producer. Using the method of the interface, KafkaProducer basically has the following basic capabilities:

  • Void initTransactions() initializes the transaction, which must be called first if a transaction method is needed.
  • Void beginTransaction() starts a transaction.
  • Void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) This will be covered in more detail in the article introducing Kafka transactions.
  • Void commitTransaction() commits the transaction.
  • Void abortTransaction() rolls back the transaction.
  • Future< RecordMetadata> send(ProducerRecord

    Record) Sends a message. This method is asynchronous by default. To achieve synchronous sending, you can call the GET method on the returned result.
    ,>
  • Future< RecordMetadata> send(ProducerRecord

    Record, Callback Callback) Sends a message, which supports Callback.
    ,>
  • Void flush() directly wakes up the sending thread, ignoring the value of lingering.ms, and sends all messages in the buffer to the broker.
  • List< PartitionInfo> partitionsFor(String topic) Gets the routing information (partition information) for a topic.
  • Map< MetricName, ? Extends Metric> metrics() gets the statistics collected by the producer.
  • Void close() closes the sender.
  • Void close(Duration timeout) Disables the sender of the message periodically.

The above methods will be covered in more detail in future articles as needed. Let’s take a look at what KafkaProducer’s core properties mean.

  • String clientId clientId. Id specifies a clientId when KafkaProducer is created. If this is not specified, the default is producer-seq, and seQ increases within the process. It is strongly recommended that the client display the specified clientId.
  • Metrics Metrics Metrics Metrics of the storage containers measured, such as message body size, sending time, and other monitoring-related Metrics.
  • Partitioner Partitioner load balancing algorithm, specified by parameter Partitioner. Class.
  • Int maxRequestSize Specifies the maximum size of the request sent by invoking the send method. The total size of the message including the key and the message body after serialization cannot exceed this value. Size is set with the parameter max-request. size.
  • Long totalMemorySize totalMemorySize occupied by the producer cache, set with buffer.memory.
  • Metadata Metadata Metadata Metadata information, such as topic routing information, is automatically updated by KafkaProducer.
  • RecordAccumulator Accumulator is an Accumulator of message records. This accumulator is described in details in the message sending section.
  • Sender Sender encapsulates the logic for sending messages, that is, the processing logic for sending messages to the broker.
  • Thread ioThread Is a background Thread for sending messages. It is an independent Thread that internally uses the Sender to send messages to the broker.
  • CompressionType CompressionType Compression type. Compression is disabled by default and can be configured using compression. Optional value: None, gzip, snappy, LZ4, or ZSTD.
  • Sensor errors Error collector, as a metrics, used to monitor.
  • Time Time Is used to obtain the system Time or thread sleep.
  • Serializer< K> keySerializer is used to serialize keys of messages.
  • Serializer< V> valueSerializer serializes the message body.
  • ProducerConfig Configures the ProducerConfig producer.
  • Long maxBlockTimeMs Maximum block time. When the cache used by the producer has reached the specified value, the message will be blocked. Max.block. ms is used to set the maximum wait time.
  • ProducerInterceptors

    Interceptors A producer-side interceptor that does some customization before sending a message.
    ,>
  • ApiVersions ApiVersions maintains meta information about API versions. This class can only be used inside Kafka.
  • TransactionManager TransactionManager kafka message TransactionManager.
  • Initial results TransactionalRequestResult initTransactionsResult kafka producers transaction context.

The next article will focus on the process of sending messages. KafkaProducer Let’s conclude this article with a simple example.

3. KafkaProducer simple example

package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers"."localhost:9092,localhost:9082,localhost:9072,");
        props.put("acks"."all");
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        try {
            for (int i = 0; i < 100; i++) {
                Future<RecordMetadata>  future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
                RecordMetadata recordMetadata = future.get();
                System.out.printf("offset:"+ recordMetadata.offset()); }}catch (Throwable e) {
            e.printStackTrace();
        } finally{ producer.close(); }}}Copy the code

The main purpose of this article is to understand Kafka Producer, which leads to the following content that needs to be learned. The next article will focus on Kafka message sending process, please pay attention to.

If this article is helpful, please give it a thumbs up. Thank you.


Welcome to add the author micro signal (DINGwPMZ), add group discussion, the author quality column catalog: 1, source analysis RocketMQ column (40 +) 2, source analysis Sentinel column (12 +) 3, source analysis Dubbo column (28 +) 4, source analysis Mybatis column 5, source analysis Netty column (18 +) 6, source analysis JUC column Source code analysis (MyCat) for Elasticjob