Message is sent

1. Producer Sends messages

The new version of Kafka sends messages asynchronously. In the process of sending a message, two threads are involved. One main thread, one daemon thread, one send thread. There is also a thread shared variable (cache) RecordAccumulator. The main thread is responsible for creating the message and placing it in the RecordAccumulator cache. The daemon thread sends messages from the cache to the Kafka Broker

2. Refine the message sending process

2.1 the main thread

  1. Encapsulate the message asProducerRecordObject format and callsendMethod to send a message
  2. Enter the producerInterceptors (message format detection, etc.)
  3. updatekafkaCluster data
  4. Serialize the message object intobyteAn array of
  5. Use a divider for partition calculations
  6. Appends a message to a shared thread variableRecordAccumulatorIn the

2.2 Start work when KafkaProducer instantiates SEND

  1. sendThe thread fromRecordAccumulatorRetrieve the message and process the message format
  2. Build the message send request objectrequest
  3. Hand over a requestSelectorAnd put the message on the queue
  4. The request to remove the request queue upon receipt of the response invokes the callback function on each message

3. Actual code case of message sending

3.1 Project dependency references

Note that the maven dependency version you refer to must be the same as the kafka version installed on your server

<! Kafka messaging service -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>Against 2.4.1</version>
        </dependency>

Copy the code

3.2 Understanding related classes

  1. KafkaProducerCreate a producer object that sends data
  2. ProducerConfigGet connectedkafkaSome of the basic configurations (mana record)
  3. ProducerRecordEach piece of data must be encapsulated asProducerRecordObject to send

3.3 Sending Simple Message Demo

public class ProductTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        Properties prop = newProperties(); Prop. Put (producerconfig. BOOTSTRAP_SERVERS_CONFIG, producerConfig. BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
        // Validation mechanism
        prop.put(ProducerConfig.ACKS_CONFIG,"all");
        // Number of retries
        prop.put(ProducerConfig.RETRIES_CONFIG,1);
        // Batch size
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        // Wait time
        prop.put(ProducerConfig.LINGER_MS_CONFIG,1);
        // Buffer size
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33356444);
        // Key serialization
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //value specifies the serialization mode
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String,String> producer = new KafkaProducer<String, String>(props);

        For (int I = 0; i < 100; I++) {// if there is an exception, send will fail. If there is no exception, send will succeed ProducerRecord<>("test",i+"", I +""),(metadata,exception) -> {if(exception == null) {system.out.println (" message sent successfully ->" + metadata.offset()); }else { exception.printStackTrace(); }}); } * /

        for (int i = 0; i < 100; i++) {
            RecordMetadata metadata = producer.send(
                    new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))
            ).get();
            System.out.println(metadata.offset());
        }
        // Close the producerproducer.close(); }}Copy the code

3.4 Message Viewing

After executing the above code, you can go to the server and view the message you just created with the command

./kafka-console-consumer.sh --bootstrap-server 192.168.10.126:9092 --from-beginning --topic test

Copy the code

4. Retry mechanism

The message received

1. Consumer-related classes

  1. KafkaConsumerCreate an object to consume the message
  2. ConsumerConfigThe consumer object instantiates the relevant configuration items
  3. ConsumerRecordsEach piece of data can be consumed only after it is encapsulated into this object

Note that no close operation is required after the consumer object is created, and the thread is always in the background, retrieving messages from the message server

2. Consume server message Demo

public class ConsumerTest {

    public static void main(String[] args) {

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.126:9092");
        // serialize key
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // serialize value
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        / / automatic offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        / / group
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"1205");
        // Automatic commit interval
        props.put("auto.commit.interval.ms"."1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
        // Subscribe to the topic
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
            for(ConsumerRecord<String,String> str : records) {
                System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value())); }}}}Copy the code

3. Offset submission mode

3.1 Automatic Submission

Using the properties parameter

Put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); Props. Put ("auto.commit.interval.ms"."1000");
Copy the code

3.2 Manual Submission

Manual submission requires calling the specified method in the code

while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(100));
            for(ConsumerRecord<String,String> str : records) {
                System.out.println(String.format("topic = %s, offset= %s ,value= %s",str.topic(),str.offset(),str.value()));
            }
            // Manually submit offset
            consumer.commitAsync();
        }

Copy the code

3.3 Manual and automatic submission areas

  1. In a real production environment where manual commits are provided, this message consumption is manageable. Each time a message is consumed, there is an offset. Even if the message service crashes, the restart message service reads the last consumed message from the ZKoffsetTo ensure that messages continue from current consumption without reconsumption or loss
  2. Manual submissionoffsetIs ordered to ensure that the data is in order

Springboot integrationkafkaSee the next issue