【 article content output source: pull pull education Java salary training camp 】 — all brain maps are made by myself, do not abuse without permission

Mountaineering is full of feelings in the mountains, the sea is overflowing in the sea


Kafka architecture and practice

Part 1 – Concepts and basic architecture

“Enter the protagonist”

Kafka was originally developed by Linkedin as a distributed, partitioned, producer-subscriber distributed logging system based on ZooKeeper coordination, and has since been used as an MQ system due to its strong persistence and high throughput. (Contributed to the Apache Foundation in 2010 and became a top open source project)

Main application scenarios: Log collection system or message system

Design objectives:

  1. The message persistence capability is provided in O(1) time complexity to ensure constant time access performance even for data larger than TB.
  2. High throughput. Even on very cheap commercial machines, it can be done on a single machine to support 100K messages per second.
  3. Supports message partitioning between Kafka servers and distributed consumption, while ensuring the sequential transmission of messages within each partition.
  4. Support offline data processing and real-time data processing.
  5. Support online horizontal scaling

In general, there are two modes of message delivery: “point-to-point delivery” and “publish-subscribe.” Kafka is a typical publish-subscribe model. Also, general message-oriented middleware has two modes: “push mode” and “pull mode”, whereas Kafka only has pull mode and no push (polling is used to push messages).

  • Kafka runs as a cluster on one or more servers that span multiple data centers
  • Kafka cluster according toThe themeCategory management, a topic can have more than onepartitionA partition can have multiple partitionsA copy of the partition
  • Each record consists of onekey, avalueAnd aThe time stampcomposition

Four core apis:

  1. Producer API: Allows applications to publish record streams to one or more Kafka topics
  2. Consumer API: Allows applications to subscribe to one or more topics and process the record streams generated for them
  3. The Streams API: Allows an application to effectively transform input Streams into output Streams by acting as a stream processor, consuming input Streams for one or more topics and generating output Streams for one or more output topics
  4. Connector API: Allows you to build and run reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector for a relational database might capture all changes to a table

“The advantage”

  1. High throughput: A single machine processes tens of millions of messages per second. It maintains stable performance even with many terabytes of messages stored.
  2. A high performance: A single node supports thousands of clients and ensures zero downtime and data loss.
  3. Persistent data stores: Persists the message to disk. Prevent data loss by persisting data to hard disk and replication.
    1. Zero copy
    2. Read sequentially, write sequentially
    3. Take advantage of Linux page caching
  4. Distributed system, easy to expand out. There are multiple producers, brokers, and consumers, all of which are distributed. The machine can be expanded without stopping. Multiple producers and consumers may be different applications.
  5. reliability– Kafka is distributed, partitioned, replicated and fault-tolerant.
  6. Client status maintenance: Message processing state is maintained on the Consumer side, not the server side. Automatic balancing in case of failure.
  7. Online and offline scenarios are supported.
  8. Support for multiple client languages. Kafka supports Java,.NET, PHP, Python and many other languages.

Application Scenarios

  • Log collection: Open it up to consumers as a unified interface service using Kafka to collect service logs
  • Message systems: decouple producers and consumers, cache messages, and so on
  • User activity tracking: Kafka is often used to record the activities of Web users or App users, such as browsing, searching, clicking, etc. These activities are published by various servers to Kafka topics, which consumers subscribe to for real-time analysis, and can also be saved to a database
  • Operational metrics: Kafka is also used to record operational monitoring data. This includes collecting data for various distributed applications and producing centralized feedback for various operations, such as alarms and reports
  • Streaming processing: Spark Streaming and Storm

“Basic Architecture”

Message & Batch

Kafka’s units of data are called messages. You can think of a message as a “row” or a “record” in a database. The message consists of an array of bytes.

For efficiency, messages are written to Kafka in batches. A batch is a group of messages that belong to the same topic and partition.

Breaking messages into batches reduces network overhead. The larger the batch, the more messages are processed per unit of time and the longer the transmission time of individual messages. Batch data is compressed, which improves data transmission and storage, but requires more computational processing.

model

Message schemas have a number of options available for easy understanding. Examples include JSON and XML, but they lack strong typing capabilities. Many Kafka developers like to use Apache Avro.

Avro provides a compact serialization format that separates schema and message body. There is no need to regenerate code when schemas change, strong typing and schema evolution are supported, and versions are both forward and backward compatible.

Consistency in the data format is important to Kafka because it eliminates coupling between message read and write operations.

Topics & Partitions

Kafka’s messages are categorized by Topic. Topics can be likened to tables in a database or folders in a file system. Topics can be divided into partitions, and a topic is partitioned across a Kafka cluster, providing the ability to scale horizontally.

Producer & Consumer

The producer creates the message. Consumer consumption news.

A message is published by a producer to a specific topic, and by default, the producer distributes the message evenly across all partitions of that topic. It can be summarized as the following steps:

  1. Partitioning of messages can be specified (if not, it is balanced publication)
  2. The partition is derived by modulo the key hash of the message
  3. Poll the specified partition

Consumers consume messages by differentiating between already read messages with offsets. Consumers are part of the consumer group. Consumer groups ensure that each partition can only be used by one consumer to avoid repeated consumption.

The broker & cluster

A separate Kafka server is called the Broker. The broker receives messages from the producer, sets offsets for the message, and commits the message to disk for saving. The broker serves consumers and responds to requests to read partitions by returning messages that have been committed to disk. A single broker can easily handle thousands of partitions and millions of messages per second.

Each cluster has a broker that is the cluster controller “head” (automatically elected from the active members of the cluster). The “leader” is responsible for assigning partitions to brokers and monitoring them. A partition can be assigned to multiple brokers, at which point partition replication occurs (as in Broker1 in the figure above, the leader copies his own partition to Broker2).

“Core Concepts”

  • Producer-producers publish messages:
    • Producers publish messages to Kafka’s topics. After the broker receives the message sent by the producer, it appends the message to the segment file currently used to append data.
      • By default, messages are evenly distributed across all partitions of a topic through polling.
      • In some cases, producers write messages directly to the specified partition. This is typically done with a message key and a partitioner, which generates a hash value for the key and maps it to the specified partition. This ensures that messages containing the same key will be written to the same partition.
      • Producers can also use custom partitions to map messages to partitions based on different business rules.
  • Consumer- The Consumer reads the message:
    1. Consumers subscribe to one or more topics and read them in the order in which messages are generated.
    2. consumersMessages that have been read are distinguished by checking their offsets.

      The offsetIt’s kind of metadata. It’s aIncreasing integer valuesWhen creating a message, Kafka adds it to the message. Each message has a unique offset within a given partition. The consumer stores the last read message offset for each partition on Zookeeper or Kafka, and its read state is not lost if the consumer shuts down or restarts.
    3. Consumers are part of the consumer group. Groups ensure that each partition can only be used by one consumer.
    4. If a consumer fails, other consumers in the consumer group can take over the work of the failed consumer, rebalancing, and partition reallocation.
  • Broker- Provides services to consumers:
    • The broker serves consumers and responds to requests to read partitions by returning messages that have been committed to disk.
      • If a topic has N partitions and a cluster has N brokers, then each broker stores one partition of that topic
      • If a topic has N partitions and a cluster has (N+M) brokers, then N brokers store one partition for that topic and the remaining M brokers do not store the partition data for that topic
      • If a topic has N partitions and there are fewer than N brokers in the cluster, then a broker stores one or more partitions for that topic. In the actual production environment, avoid this situation. This situation may cause data imbalance in the Kafka cluster
  • Topic- Topics are used to classify messages:
    • Topics are like the tables of a database, especially the logical tables that follow the tables of a separate library.
  • Partition- A Partition implements data redundancy and scaling:
    1. Topics can be divided into several partitions, each of which is a commit log.
    2. Messages are appended to the partition and then written toFirst-in, first-outRead the sequence of.
    3. The order of messages cannot be guaranteed across the topic, but within a single partition.
    4. Kafka implements data redundancy and scalability through partitioning.
    5. The number of partitions needs to be set to 1 in scenarios where the consumption order of messages needs to be strictly guaranteed.
  • Replicas ensure data consistency:
    • Kafka uses topics to organize data, and each topic is divided into partitions with multiple copies of each partition. Those copies are stored on brokers, each of which can hold hundreds or thousands of copies of different themes and partitions.
      • Chief copy: Each partition has a copy of the leader. To ensure consistency, all producer and consumer requests pass through this copy
      • Follower copy: Replicas other than the leader are follower replicas. Follower replicas do not process requests from clients, and their only job is to copy messages from the leader, keeping them in the same state as the leader. If the leader crashes, one of the followers is promoted to the new leader.
  • Offset- Ensures read and write order:
    • At present, the ConsumerA consumption is at the offset=9, and the ConsumerB consumption is at the offset=11. When they come to the next consumption, they can choose to continue with the last consumption, start from the beginning, or skip to the end and start from the latest news.

Part 2 – Installation and configuration

Because Kafka is based on Zookeeper and Zk is based on Java, you need to make sure that the runtime JDK and Zookeeper are available!

The JDK environment configuration in Linux is not described here. The zooKeeper environment configuration is as follows:

  1. in../zookeeper-x.x.x/conf/zoo.cfgEdit thedataDir=.. /zookeeper.data(Directory where ZooKeeper saves data)
  2. Edit/etc/profile
    export ZOOKEEPER_PREFIX=/root/my-software/apache-zookeeper-x.x.x
    export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
    export ZOO_LOG_DIR=$PATH:$ZOOKEEPER_PREFIX/logs
    Copy the code
  3. To make the configuration take effect:
    source /etc/profile
    Copy the code
  4. validation
    zkServer.sh status
    Copy the code


Kafka installation and configuration

  1. Download the decompression package from the official website
  2. Execute the commandtar -zxvf kafka_x.x.x.tgz -C /optUnpack the
  3. Configuring environment Variablesvim /etc/profile
    export KAFKA_HOME=/opt/kafka_x.x.x
    export PATH=$PATH:$KAFKA_HOME/bin
    Copy the code
  4. configuration/ opt/kafka_2. 12-1.0.2 / configserver.propertiesfile
    .
    # specify a path for storing related logs
    log.dirs=/opt/kafka_x.x.x/logs
    .
    # specify the mapping path of Zookeeper (default host,'myKafka'-> initial node name to be created in zK)
    zookeeper.connect=localhost:2181/myKafka
    Copy the code
  5. After starting ZK, use the command./bin/kafka-server-start.sh config/server.propertiesStart the kafka
    • After successful startup:
    • You can also start it as a daemon

      ./bin/kafka-server-start.sh -daemon config/server.properties
  6. View the Kafka background processpa -ef | grep kafka
  7. Start the ZooKeeper client to view the node./zkCli.sh + ls /myKafka
    • /brokers: Holds all information about the Kafk cluster, including the registration information of each broker, and information about all topics in the cluster.
    • /controller: Saves the registration information of the Kafka Controller component (controller is responsible for the leader election of the cluster), as well as the dynamic election of the controller.
    • /admin: saves the output of the management script, such as deleting topics and redistributing partitions.
    • / ISr_change_notification: Saves the partition list where the ISR list has changed. The controller registers a listener to monitor changes to its subnodes in real time.
    • /config: saves the configuration information for the various resources in Kafka cluster. For example, each topic may have its own set of configurations. Save it under /config/topic/
    • /cluster: stores brief information about the Kafka cluster, including the cluster ID and cluster version
    • / controller_EPOCH: Saves the version number of the Controller component that Kafka uses to isolate invalid Controller requests.
  8. Disable kafka running in the background (daemon thread startup) :./kafka-server-stop.sh
  9. Other important scripts:
    1. Kafka-topics. Sh: topic management command
      # List existing topics
      kafka-topics.sh --list --zookeeper localhost:2181/myKafka
      TOPIC_001 theme contains a partition 1(default Leader partition)
      kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic TOPIC_001 --partitions 1 --replication-factor 1
      Query partition information
      kafka-topics.sh --zookeeper localhost:2181/myKafka --list
      View details about the specified topic
      kafka-topics.sh --zookeeper localhost:2181/myKafka --describe 
      # delete the specified theme
      kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic TOPIC_001
      Copy the code
    2. Kafka-console-producer. sh: indicates a production message
      # turn on producers
      kafka-console-producer.sh --topic TOPIC_001 --broker-list localhost:9092
      Copy the code
    3. Kafka-console-consumer. sh: consumption message
      # Turn on consumers
      kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_001
      # Start consumer 2 (start consumption, not offset consumption)
      kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_001 --from-beginning
      Copy the code

Part 3 – Development of actual combat

“Message sending and receiving”

KafkaProducer main object: KafkaProducer/ProducerRecord

KafkaProducer Specifies the parameters for creating KafkaProducer.

parameter instructions
bootstrap.servers Configure how producers connect to brokers.

This parameter sets the initialization parameter. If the producer needs to connect to a Kafka cluster, the addresses of several brokers in the cluster are configured, not all of them. After the producer connects to the broker specified here, other nodes in the cluster are discovered through this connection.
key.serializer The serialized class of key data to send information to.

You can either write the Class name or use the Class object for that Class.
value.serializer The serialized class for the value data to send the message.

You can either write the Class name or use the Class object for that Class.
acks Reply message. (Default: all)



acks=0The producer does not wait for the broker to acknowledge the message. It considers the message to have been sent as long as it is placed in the buffer.

(This situation does not guarantee that the broker actually received the message, nor does the REtries configuration take effect. The message offset returned by the sent message is always -1.

acks=1: indicates that the message needs only to be written to the primary partition and then responds to the client without waiting for confirmation from the replica partition.

(In this case, if the primary partition goes down after receiving the message acknowledgement, but the replica partition has not had time to synchronize the message, the message is lost.)

acks=allThe: master partition waits for all ISR copies of partition confirmation records.

This processing ensures that messages will not be lost as long as one ISR replica partition survives. This is the strongest guarantee for Kafka reliability, equivalent to acks=-1)
retries Retry count.



Case1 ==> If an error occurs during message sending, the system resends the message.

Case2 ==> Is the same as resending an error received by the client.

(This parameter is required if the retry function is configured to ensure the order of messagesMAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1Otherwise, other messages may have been sent successfully when this failed message was retried.)
  1. More parameters are available directly in the org.apache.kafka.clients.producer.ProducerConfigLook for
  2. The acks parameter: After the message is consumed by the consumer, the broker side needs confirmation, which can be synchronized but inefficient. Asynchronous validation can be efficient but callback functions need to be set


Let’s take a quick look at the use of messages with a Java demo

– Kafka – Service ready

Create the topic topIC_1 in the Kafka service and start the consumer to consume the topic

# create theme
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_1 --partitions 1 --replication-factor 1
# View theme
./bin/kafka-topics.sh --list --zookeeper localhost:2181
topic_1
Copy the code
– POm – Dependency import

Check the version of your broker in advance:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <! The higher version is compatible with the lower version. We use the version that is consistent with the broker.
        <version>2.7.0</version>
    </dependency>
</dependencies>
Copy the code
-P1- Producer A
public class MyProducerA {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        Map<String, Object> configs = new HashMap<>();
        // 1. Specify the broker address for the initial connection
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "117.50.40.96:9092");
        // 2. Specify the serialized class for key
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        // 3. Specify the serialized class of value
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 4. Prepare producers
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
        // 5. Set user-defined header fields
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name"."producer.demo".getBytes()));
        // 6. Prepare the pre-sent message (record)
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("topic_1".0.0."Hello Kafka!");
    
        /** * message synchronization confirmation */
        /*Future
      
        future = producer.send(producerRecord); RecordMetadata metadata = future.get(); System.out.println(" Message subject: "+ metadata.topic()); System.out.println(" message partition number: "+ metadata.partition()); System.out.println(" Message offset: "+ metadata.offset()); * /
      
    
        /** * Message confirmation */
        producer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("Topic of message:" + metadata.topic());
                System.out.println("Partition number of message:" + metadata.partition());
                System.out.println("Message offset:" + metadata.offset());
            } else {
                System.out.println("Exception message:"+ exception.getMessage()); }});// 6. Close the producerproducer.close(); }}Copy the code

! If the following occurs: Scripts can be used to send and receive messages on the Linux server, but the external program API fails to call Proeucer to send messages (timeout). For details, see Solution

-P2- Producer B(100 in a row)
public class MyProducerB {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        HashMap<String, Object> configs = new HashMap<>();
        // 1. Specify the broker address for the initial connection
        configs.put("bootstrap.servers"."117.50.40.96:9092");
        // 2. Specify the serialized class for key
        configs.put("key.serializer", IntegerSerializer.class);
        // 3. Specify the serialized class of value
        configs.put("value.serializer", StringSerializer.class);
        // 4. Prepare producers
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
        // 5. Set user-defined header fields
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name"."producer.demo".getBytes()));
        // 6. Prepare the pre-sent message (record)
        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("topic_1".0, i, "Hello Kafka!" + i, headers);
            
            /** * message synchronization confirmation */
        /*Future
      
        future = producer.send(record); RecordMetadata metadata = future.get(); System.out.println(" Message subject: "+ metadata.topic()); System.out.println(" message partition number: "+ metadata.partition()); System.out.println(" Message offset: "+ metadata.offset()); * /
      
            
            /** * Message confirmation */
            producer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("Topic of message:" + metadata.topic());
                    System.out.println("Partition number of message:" + metadata.partition());
                    System.out.println("Message offset:" + metadata.offset());
                } else {
                    System.out.println("Exception message:"+ exception.getMessage()); }}); }// 6. Close the producerproducer.close(); }}Copy the code

-C1- Consumer (pull from earliest message)
public class MyConsumer {
    
    public static void main(String[] args) {
    
        HashMap<String, Object> configs = new HashMap<>();
        // 1. Specify the broker address for the initial connection
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "117.50.40.96:9092");
        // 2. Configure the deserializer for key
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        // 3. Configure the deserializer for value
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 4. Configure the consumer group ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo1");
        // 5. If no valid offset is found for the current consumer, it is automatically reset to the beginning
        Latest: reset directly to the last of message offsets/earliest: consume from scratch)
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 6. Prepare customers
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);
        // 7. Subscribe first, consume later
        consumer.subscribe(Arrays.asList("topic_1"));
        /* --> batch pull messages from the topic's partitions */
        ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3 _000);// Pull once in 3 seconds
        /* --> iterate over the batch messages pulled from the topic */
        consumerRecords.forEach(record -> System.out.println(record.topic() + "\t"
                + record.partition() + "\t"
                + record.offset() + "\t"
                + record.key() + "\t"
                + record.value()));
        // 8. Close the consumerconsumer.close(); }}Copy the code

Print the following:

"C: \ Program Files \ Java \ jdk1.8.0 _131 \ bin \ Java exe" "-JavaAgent :E:\IDE\IntelliJ IDEA 2019.3.5\lib\idea_rt.jar=1107:E:\IDE\IntelliJ IDEA 2019.3.5\bin" -Dfile.encoding=UTF-8 -classpath "C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ charsets jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ deploy the jar. C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ access - bridge - 64. The jar. C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ cldrdata jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ DNSNS jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ jaccess jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ JFXRT jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ localedata jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ nashorn jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ sunec jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ sunjce_provider jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ sunmscapi jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ sunpkcs11 jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ ext \ zipfs jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ javaws jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ jce jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ JFR jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ JFXSWT jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ jsse jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ management - agent jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ plugin jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ resources jar; C: \ Program Files \ Java \ jdk1.8.0 _131 \ jre \ lib \ rt jar; C:\Archie\archieself-study\java-kafka\target\classes; C: \ work \ mvnRepository \ org \ apache \ kafka, kafka - clients, 1.0.2, kafka - clients - 1.0.2. Jar; C: \ work \ mvnRepository \ org \ lz4 \ lz4 - Java \ \ lz4 - Java 1.4-1.4. Jar; C: \ work \ mvnRepository \ org \ xerial \ snappy \ snappy - Java \ 1.1.4 \ snappy - Java - 1.1.4. Jar; C: \ work \ mvnRepository \ org \ slf4j \ \ 1.7.25 \ slf4j slf4j - API - API - 1.7.25. Jar" com.archie.consumer.MyConsumerA
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
topic_1	0	0	null	hello 1
topic_1	0	1	null	hello
topic_1	0	2	0	Hello Kafka! 0
topic_1	0	3	0	Hello Kafka! 1
topic_1	0	4	0	Hello Kafka! 2
topic_1	0	5	99	Hello Kafka! 3
topic_1	0	6	0	Hello Kafka! 4
.
topic_1	0	103	97	Hello Kafka! 97
topic_1	0	104	98	Hello Kafka! 98
topic_1	0	105	99	Hello Kafka! 99

Process finished with exit code 0

Copy the code

“SpringBoot Kafka”

– POm – Indicates the dependency declaration

      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0 the SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2. RELEASE</version>
        <relativePath/> <! -- lookup parent from repository -->
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
Copy the code
– properties-Spring Performs initial configuration
spring.application.name=springboot-kafka
server.port=8080
# Kafka configuration
spring.kafka.bootstrap-servers=117.50.40.96:9092
# -- Producer configuration --
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
The maximum number of records a producer can place per batch
spring.kafka.producer.batch-size=16384
The total available send buffer size on the producer side is set to 32MB
spring.kafka.producer.buffer-memory=33554432
# -- Consumer configuration --
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer
If the offset of the current consumer is not found in Kafka, the offset is reset to the earliest
spring.kafka.consumer.auto-offset-reset=earliest
Is the consumer's offset submitted automatically or manually
spring.kafka.consumer.enable-auto-commit=true
# Interval at which consumer offsets are automatically submitted
spring.kafka.consumer.auto-commit-interval=1000
Copy the code
-SyncController- Synchronization controller
@RestController
public class KafkaSyncProducerController {
    
    @Autowired
    private KafkaTemplate<Integer, String> template;
    
    @RequestMapping("send/sync/{message}")
    public String send(@PathVariable String message) {
        
        final ListenableFuture<SendResult<Integer, String>> future = template.send("topic-spring".0.0, message);
        // Send messages synchronously
        try {
            final SendResult<Integer, String> sendResult = future.get();
            final RecordMetadata metadata = sendResult.getRecordMetadata();
            
            System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
        return "success"; }}Copy the code
-AsyncController- Asynchronous controller
@RestController
public class KafkaAsyncProducerController {
    
    @Autowired
    private KafkaTemplate<Integer, String> template;
    
    
    @RequestMapping("send/async/{message}")
    public String send(@PathVariable String message) {
        
        final ListenableFuture<SendResult<Integer, String>> future = this.template.send("topic-spring".0.1, message);
        
        // Set up the callback function to wait asynchronously for the broker to return the result
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Failed to send message:" + throwable.getMessage());
            }
            
            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                final RecordMetadata metadata = result.getRecordMetadata();
                
                System.out.println("Message sent successfully:" + metadata.topic() + "\t" + metadata.partition() + "\t"+ metadata.offset()); }});return "success"; }}Copy the code
-Listener- Client consumer (Listener)
@Component
public class MyConsumer {
    
    @KafkaListener(topics = "topic-spring")
    public void onMessage(ConsumerRecord<Integer, String> record) {
        System.out.println("Message to consumers:"
                + record.topic() + "\t"
                + record.partition() + "\t"
                + record.offset() + "\t"
                + record.key() + "\t"+ record.value()); }}Copy the code
-App- Startup entry
@SpringBootApplication
public class SpringbootKafkaApplication {
    
    public static void main(String[] args) { SpringApplication.run(SpringbootKafkaApplication.class, args); }}Copy the code

Start the main entrance now and try! KafkaAdmin = KafkaAdmin = KafkaAdmin = KafkaAdmin = KafkaAdmin = KafkaAdmin = KafkaAdmin = KafkaAdmin = KafkaAdmin

Test result of synchronous access: ↓

Test result of asynchronous access: ↓


Of course, any technology that we incorporate as part of the SpringBoot project allows us to extend our own configuration capabilities and provide apis to the greatest extent possible, as described below:

  1. You want to customize the create topic (custom name and partition size)
    @Configuration
    public class KafkaConfig {
        @Bean
        public NewTopic topic1(a) {
            return new NewTopic("my-topic-01".3, (short) 1);
        }
    
        @Bean
        public NewTopic topic2(a) {
            return new NewTopic("my-topic-02".5, (short) 1); }}Copy the code

    After the start:

  2. You want to override various initial configurations

Part 4 – Configuring server Parameters

“Zookeeper. Connect”

This parameter is used to set the address of the Zookeeper/ cluster that Kafka wants to connect to.

Its value is a string that uses commas to separate multiple Zookeeper addresses. The single Zookeeper address is in the form of host:port. You can add Kafka’s root node path to Zookeeper at the end.

For example:

zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka...
Copy the code

“Listeners”

Specifies the address and port through which the current Broker publishes services. Advertised. Listeners are used for internal and external network isolation.

For example,

# mapping configuration of listener names and security protocols (each listener name can appear only once in the map)
listener.security.protocol.map=INTEGER:PLAINTEXT,EXTERNAL:PLAINTEXT
Configure a list of URIs and listener names that broker listens to. Separate urIs and listener names with commas
listeners=INTERNAL: / / 192.168.100.101:9092, EXTERNAL: / / 192.168.100.130:9093
Note The name of listeners used to configure advertised.listeners must be included in the advertised.listeners list
inter.broker.listener.name=EXTERNAL
Note This address needs to be published to ZooKeeper for clients to use, if the client uses an address different from the listeners configuration
# (Must be part of the process configuration)
advertised.listeners=EXTERNAL: / / 192.168.100.130:9092
Copy the code

“Broker. Id”

Used to uniquely mark a Kafka Broker whose value is an arbitrary INTEGER value

This is especially important when Kafka runs as a distributed cluster.

Id =1 if the host name is host1.lagou.com, or 101 if the host name is 192.168.100.101.

“The dir”

The value of this property specifies the directory in which Kafka stores the log fragment of the message on disk.

It is a comma-separated set of local file system paths. If multiple paths are specified, the broker stores log fragments from the same partition to the same path according to the “least used” rule.

The broker adds partitions to the path with the least number of partitions, rather than to the path with the least disk space.

For example,

log.dirs=/var/kafka-logs
Copy the code

Kafka has a lot of advanced features need to be combined with the source code analysis, the author will continue to follow up; Here are the main points, which you can dig into by yourself:

  1. producers
    • Data sending process
    • Serializer (custom)
    • Partition is
    • The interceptor
  2. consumers
    • Concept: consumer + consumer group + heartbeat mechanism
    • Subscribing to message reception, deserialization (custom)
    • Shift delivery of messages, shift management
    • More balanced
    • Consumer interceptor
  3. The theme
    • Create, view, modify, delete
    • Increase the partition
    • KafkaAdminClient application
    • Offset management
  4. partition
    • A copy of the mechanism
    • Leader election
    • Partition reallocation
    • Automatic rebalancing
    • Partition allocation policy
  5. Physical quantity storage
    • Concepts: LogSegment, shard file, index file shard process
    • The offset and timestamp of the index
    • Log deletion and compression policies
    • Zero copy of disk, page caching, sequential write
  6. The stability of
    • Idempotency of transactions, transactional operations
    • The broker election
    • Reliability assurance – failed copy, copy replication
    • Message duplication scenarios and solutions
    • __consumer_offsets
  7. Delay queue
  8. Retry queue
  9. Kafka cluster and operation