We previously studied ZK mainly using the scene method, find some core entrance to start analysis. When exploring the source code for Kafka, we can also refer to the previous method. However, this time we do not directly start from the Broker service node, but start from the Producer. Some new ideas and methods for analyzing source code will be used.

To analyze Kafka Producer’s source code, there must be an entry point or a starting point. Most people started using Kafka as a Demo. Deploy a Kafka yourself, send a message, and then consume a message yourself.

KafkaProducerHelloWorld

So we will start from the simplest of a KafkaProducer Demo, from a KafkaProducerHelloWorld example started Kafka source code principle exploration.

HelloWorld looks like this:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/ * * *@author fanmao
 */
public class KafkaProducerHelloWorld {
	
	public static void main(String[] args) throws Exception {
		// Configure some Kafka parameters
		Properties props = new Properties();
		props.put("bootstrap.servers"."192.168.30.1:9092");

		// Create a Producer instance
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);

		Encapsulate a message
		ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic"."test-key"."test-value");

		// Sending messages synchronously will block here until the message is sent
		// producer.send(record).get();

		// Send messages asynchronously without blocking. Set a listener callback function
		producer.send(record, new Callback() {
			@Override
			public void onCompletion(RecordMetadata metadata, Exception exception) {
				if(exception == null) {
					System.out.println("Message sent successfully");
				} else {
					System.out.println("Message sending exception"); }}}); Thread.sleep(5 * 1000);

		/ / from the producerproducer.close(); }}Copy the code

The code example above, while very simple, has its own vein.

1) Create KafkaProducer

2) Prepare message ProducerRecord

3) Producer.send ()

Let me draw a simple picture:

Here is a bit more. I mentioned the source version selection and the way to look at the source code in Zookeeper growth Note 5, so I won’t repeat it here. Kafka-0.10.0.1: Kafka-0.10.0.1

So the dependency GAV(group-artifactid-version) used by the client is org.apache.kafka-kafka-clients-0.10.0.1. The POM is as follows:

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.mfm.learn</groupId>
	<artifactId>learn-kafka</artifactId>
	<version>0.0.1 - the SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>learn-kafka</name>
	<url>http://maven.apache.org</url>


	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>

	<dependencies>
		<dependency>
		    <groupId>org.apache.kafka</groupId>
		    <artifactId>kafka-clients</artifactId>
		    <version>0.10.0.1</version>
		</dependency>
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>fastjson</artifactId>
		    <version>1.2.72</version>
		</dependency>
	</dependencies>

	<build>
	</build>
</project>

Copy the code

The creation of KafkaProducer

KafkaProducerHelloWorld: KafkaProducerHelloWorld: KafkaProducerHelloWorld: KafkaProducer So let’s see what it initializes, okay?

Here to ask you a question, the construction method of the source principle, the general analysis of the results of what method will be better?

Yes, component diagrams or source context diagrams are the easiest to understand. We just need to get a general idea. Method had, what idea can you use commonly? Guess, look at the comments and guess what the component does, right?

Ok, let’s try it!

The new KafkaProducer code is as follows:

    /**
     * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
     * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
     * @param properties   The producer configs
     */
    public KafkaProducer(Properties properties) {
        this(new ProducerConfig(properties), null.null);
    }
Copy the code

Constructor calls an overloaded constructor, so let’s take a look at the comment. As you can see in general, the constructor takes parameters that can be set through Properties and then converted to ProducerConfig object for encapsulation. There must be some conversion method. You may remember that there was a similar operation in Zookeeper’s growth notes that encapsulated a QuorumPeerConfig object. In fact, the analysis of a lot of source code, you will gradually have experience, the better to be able to analyze any source code principle. That’s what I want you to learn, not how it’s parsed and encapsulated into configuration objects.

Let’s go ahead and see how the overloaded constructor or ProducerConfig resolves. As follows:

Kafka Producer configuration

In this section, we’ll first look at how ProducerConfig parses configuration files. The code for new ProducerConfig() looks like this:

/ * *NOTE:DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND * CHANGE WILL * Note: Do not change any configuration strings or their JAVA variable names, as they are part of the public API and changes will BREAK USER CODE. * /
private static finalConfigDef CONFIG; ProducerConfig(Map<? ,? > props) {super(CONFIG, props);
}
Copy the code

The constructor thread, which calls a super, has a parent class. It looks a bit more encapsulated than Zookeeper’s configuration resolution, not a simple QuorumPeerConfig.

And there is a static variable ConfigDef CONFIG. You must wonder what it is.

We can take a look at the ConfigDef source code and see what we can see:

It looks like there’s a bunch of define methods, validate methods. Key variables, such as a Map configKeys or something. It seems that key-value configuration is placed

For example, key=bootstrap.servers, value192.168.30.9092.

For starters, look again at the ConfigDef annotation.

/** /** * This class is used for specifying the set of expected configurations. For each configuration, you can specify * the name, the type, the default value, the documentation, the group information, the order in the group, * The width of the configuration value and the name suitable for display in the UI. For each configuration, you can specify the name, type, default value, document, group information, order in the group, width of configuration values, and name suitable for display in the UI. * * You can provide special validation logic used for single configuration validation by overriding {@linkValidator}. * You can override {@linkValidator} to provide special validation logic for individual configuration validation. * * Moreover, you can specify the dependents of a configuration. The valid values and visibility of a configuration * may change according to the values of other configurations. You can override {@linkRecommender} to get valid * values and set visibility of a configuration given the current configuration values. You can specify the dependency of the configuration. The valid values and visibility of the configuration may change based on the values of other configurations. You can override {@linkRecommender} to obtain valid values, * and to set the visibility of the configuration given the current configuration value. * Omit other... * This class can be used standalone or in combination with {@linkAbstractConfig} which provides some additional * functionality for accessing configs. * This class can be used alone or with {@linkAbstractConfig} is used in combination to provide some additional functionality to access configuration functionality. * /

Copy the code

From the above, you should be able to see its function. The key-value can be set and verified. The key-value can be used to access the configuration independently

Knowing what this static variable does, you click on super for ProducerConfig to enter the parent constructor:

public AbstractConfig(ConfigDef definition, Map<? ,? > originals,boolean doLog) {
    /* check that all the keys are really strings */
    for (Object key : originals.keySet())
        if(! (keyinstanceof String))
            throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
    this.originals = (Map<String, ? >) originals;this.values = definition.parse(this.originals);
    this.used = Collections.synchronizedSet(new HashSet<String>());
    if (doLog)
        logAll();
}
Copy the code

Definition. Parse (this.originals); That is, the parrse method of ConfigDef is executed.

At this point, you don’t have to think about it. This is how you convert Properties to ProducerConfig configuration. As shown below:

Here’s a quick look at the parse method:

private final Map<String, ConfigKey> configKeys = new HashMap<>();

public Map<String, Object> parse(Map
        props) {
        // Check all configurations are defined
        List<String> undefinedConfigKeys = undefinedDependentConfigs();
        if(! undefinedConfigKeys.isEmpty()) { String joined = Utils.join(undefinedConfigKeys,",");
            throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined);
        }
        // parse all known keys
        Map<String, Object> values = new HashMap<>();
        for (ConfigKey key : configKeys.values()) {
            Object value;
            // props map contains setting - assign ConfigKey value
            if (props.containsKey(key.name)) {
                value = parseType(key.name, props.get(key.name), key.type);
                // props map doesn't contain setting, the key is required because no default value specified - its an error
            } else if (key.defaultValue == NO_DEFAULT_VALUE) {
                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
            } else {
                // otherwise assign setting its default value
                value = key.defaultValue;
            }
            if(key.validator ! =null) {
                key.validator.ensureValid(key.name, value);
            }
            values.put(key.name, value);
        }
        return values;
 }

Copy the code

This code looks a little silly, but look at the core.

ConfigKey <String, ConfigKey> ConfigKey <String, ConfigKey> ConfigKey

ParseType specifies the type of the value, and ConfigKey specifies the configuration name

2) Finally, put the prepared key-value configuration into Map<String, Object> values and return it to AbstractConfig

So in the end, the Producer argument that we configured will be put into a Map<String,Object> in AbstractConfig, and Object means that the configured value is a distinction between integers and strings. * * such as

Properties props = new Properties();
props.put("bootstrap.servers"."192.168.30. : 9092");
Copy the code

It will look something like the following:

That’s the whole process of parsing the Properties. You’ll see that it’s not that complicated, just a little more complicated than the Zookeeper package.

When is the Map<String, ConfigKey> ConfigKey initialized for the parase method?

We can go back.

private static finalConfigDef CONFIG; ProducerConfig(Map<? ,? > props) {super(CONFIG, props);
}
Copy the code

Remember that ConfigDef is passed to the parent class by the subclass before the parent method is called. This variable is static. To initialize it, there must be a static initialization code in ProducerConfig. You can find the following code:

    /** <code>retries</code> */
    public static final String RETRIES_CONFIG = "retries";

    private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
                                              + " Note that this retry is no different than if the client resent the record upon receiving the error."
                                              + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
                                              + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
                                              + " succeeds, then the records in the second batch may appear first.";
static {
        CONFIG = new ConfigDef()
            .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
			.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
			.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
			.define(ACKS_CONFIG,
					Type.STRING,
					"1",
					in("all"."1"."0"."1"),
					Importance.HIGH,
					ACKS_DOC)
			.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
			.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
			.define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
			.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
			.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
			.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, 
			// omit other define
			.withClientSslSupport()
			.withClientSaslSupport();

    }
Copy the code

The static method calls define and initializes the Producer configuration name, default values, and documentation. The final package is a map, value is ConfigKey, and ConfigDef is initialized.

private final Map<String, ConfigKey> configKeys = new HashMap<>();
Copy the code
public static class ConfigKey {
    public final String name;
    public final Type type;
    public final String documentation;
    public final Object defaultValue;
    public final Validator validator;
    public final Importance importance;
    public final String group;
    public final int orderInGroup;
    public final Width width;
    public final String displayName;
    public final List<String> dependents;
    public final Recommender recommender;
}
Copy the code

This process is fine, but the point is, the default values. This is where the default values for KafkaProducer configurations are initialized. If you want to know the default value of Producer, look here.

Kafka, Kafka, Kafka, Kafka, Kafka, Kafka, Kafka, Kafka You will understand it later when we analyze the source code. Here are some of the core configurations for you to remember:

Producer core parameters:

By default, metadata.max.age.ms refreshes metadata every 5 minutes

Max-request. size maximum size of each request (1mb)

Buffer. memory size of the buffer (32mb)

Max.block. ms Maximum blocking time after buffer fill or metadata pull (60s)

Request.timeout. ms Request timeout duration (30s)

Batch. size Default size of each batch (16KB)

Linger. Ms Defaults to 0 and does not delay transmission.

The value can be set to 10ms. If no batch has been created within 10ms, the batch must be sent immediately

.

summary

In the next section, we will continue to analyze the creation of Producer. We will use component diagrams and flowcharts to see what the overloaded constructor does after the configuration resolution.

This article is published by OpenWrite!