1 Producer API

1.1 build Producer

@Test
public void testBuildProducer(a) {
    Properties properties = new Properties();

    // Set kafka's address
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    // Number of retries. The default value is 0
    properties.put(ProducerConfig.RETRIES_CONFIG, "0");
    // Producer will attempt to batch message logging to reduce the number of requests. This will improve performance between client and server. This configuration controls the default number of message bytes for batch processing.
    // Default value: 16384
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");


    // Serialized class for the keyword. If this is not given, the default is consistent with the message
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<>(properties);
}
Copy the code

Producer also has a number of configuration items that can be set

1.1 Sending Asynchronous Messages

Kafka’s messages are published asynchronously

@Test
public void testSend(a) throws ExecutionException, InterruptedException {
    // Build the message object: ProducerRecord
    // Parameter one: topic name
    // Parameter two: the key of the message
    // Parameter three: the message value
    ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC, "key"."hello world");
    Future<RecordMetadata> send = producer.send(producerRecord);
    // The return value is a Future, so the get method blocks
    RecordMetadata recordMetadata = send.get();
    // Wait for the message to be sent successfully
    TimeUnit.SECONDS.sleep(5);
    producer.close();
}
Copy the code

1.2 Asynchronous Callback

@Test
public void testSend2(a) throws ExecutionException, InterruptedException {
    // Build the message object: ProducerRecord
    // Parameter one: topic name
    // Parameter two: the key of the message
    // Parameter three: the message value
    ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC, "key"."hello world");
    producer.send(producerRecord, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
            System.out.println(
                    "partition : " + recordMetadata.partition() + " , offset : "+ recordMetadata.offset()); }});// Wait for the message to be sent successfully
    TimeUnit.SECONDS.sleep(5);
    producer.close();
}
Copy the code