This is the 22nd day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Message sending Example

Example code for sending a message using a kafka producer might look like this:

    public static void main(String[] args) throws InterruptedException {
        String server = "localhost:9092";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Security authentication configuration, if necessary. The ACL is not enabled, ignore these configurations
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";");

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);

        String topic = "test_topic";
        String message = "hello, kafka";
        producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
            log.info("metadata: {}", metadata);
            if(exception ! =null) {
                log.error("send exception: ", exception); }}); Thread.currentThread().join(); }Copy the code

Sample code, for your reference only, let’s examine how the message is sent.

Sending Process Analysis

In view of the large amount of code, so will not post too much source analysis, try to use flow charts and text to express clearly.

View the source code

The basic flow

In the sample code, the message is sent in two steps:

  1. Constructing producer instances
  2. Message is sent

The following two processes are analyzed

Creating a producer

Create a process

In the example, the key code created is this line:

KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
Copy the code

The main process is flow chart

  1. Construct the producer instance by calling the KafkaProducer interface
  2. To convert the configured Properties to ProducerConfig, the Kafka producer has many configured Properties. If we don’t have any configured Properties, we will use the default configuration
  3. Initialize the properties of the producer instance, as shown in the image in the comments above
  4. Start the message sending thread, complete

There are two attributes that this article will focus on that are directly related to sending messages (e.g., monitoring statistics class attributes, which are not directly related, because messages can be sent even if they are not) :

These two properties:

            // Cache messages to be sent
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
            // Perform the actual message sending operation
            this.sender = newSender(logContext, kafkaClient, this.metadata);
Copy the code

Message is sent

The producer-consumer pattern sends messages

Producer has two attributes: Accumulator and Sender.

Kafka producer sending messages is not synchronous, but similar to producer-consumer asynchronous sending:

When Produer calls the send method to send a message, it simply caches the message to a queue where the consumer of the pattern (another thread) consumes the message and performs the actual sending logic. The main purpose of this method is to send messages in batches rather than single messages to improve the sending performance.

The Prdoucer instance acts as the producer of this pattern, with Accumulator being the cache queue and Sender being the consumer.

Sender performs the main logic in an asynchronous thread (ioThread). The Sender continuously obtains the batch of messages to be sent from the Accumulator and sends them to the target broker over the network. The basic flow is as follows:

Therefore, there are two main steps to get here:

  1. Producer calls the Send method to place the message into Accumulator
  2. Sender gets the message from Accumulator and sends it to kafka Broker

The message is placed in Accumulator

We might send a message like this:

        producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
            log.info("metadata: {}", metadata);
            if(exception ! =null) {
                log.error("send exception: ", exception); }});Copy the code

Take a look at the execution flow after calling send:

The messages are actually sent in batches. As you can see from the flow chart, a batch of messages meets the requirements before the sender wakes up to send. Now let’s look at the process of the sender actually sending messages.

Messages are sent asynchronously in batches

The sender is initialized when calling new KafkaProducer() to construct the producer instance.

            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + "|" + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
Copy the code

An ioThread executes the related logic in an asynchronous thread (ioThread), which is an infinite loop:

    @Override
    public void run(a) {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                // Call sender.runonce ()
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // Execute the shutdown logic, ignoring the code
    }
Copy the code

The Sender. RunOnce method keeps calling requests to send production messages:


        long currentTimeMs = time.milliseconds();
        long pollTimeout = sendProducerData(currentTimeMs);
        client.poll(pollTimeout, currentTimeMs);
Copy the code

If you look at these two lines of code, is it strange to call the client. Poll method after calling the sendProducerData method?

Let me explain briefly:

  1. Kafka’s network communication framework is its own implementation based on the Java NIO package, rather than the Netty package used by rocketMQ and others
  2. Calling the sendProducerData method simply prepares the request to be sent, without actually transferring it over the network
  3. Calling the client.poll method does the actual I/O operation, sending all channel data over the socket

The basic process looks like this:

conclusion

The sending process of messages is basically like this, mainly using the producer-consumer mode, the form of asynchronous batch sending.

There are still a lot of details in the process, I won’t go into all of them.

In the sample code, ACL authentication is configured. Kafka’s authentication mechanism is implemented when a connection is established. If the default is no authentication and clear text transmission, the key handshake is an empty implementation, which will be explained separately in the text. It’s not like RocketMQ, which authenticates permissions each time a request is made with a relevant key.