Kafka is now widely used in our company as a log collector for AdServer and a messaging system for Counter services.

This article will first introduce some basic concepts of Kafka, then introduce how to build Kafka clusters and how to use them, and finally briefly introduce the implementation principle of Kafka file storage.

Introduction to Basic Concepts

  • BrokerIt can be understood as a Kafka node. Multiple Broker nodes constitute the entire Kafka cluster.
  • TopicA collection of messages of a certain type;
    • PartitionIt is a physical grouping of topics. Multiple partitions are stored on different Kafka nodes. Messages for a single Partition are guaranteed to be ordered, but messages for the entire Topic are not necessarily ordered.
    • SegmentA specified size file containing message content, consisting of an index file and a log file; A Partition consists of multiple Segment files
      • OffsetThe index value of the message in the Segment file, counting from 0
    • Replica (N)The redundant backup of messages means that each Partition has N identical redundant backups, which are stored on different machines as far as possible.
  • ProducerPublish new messages to a Topic through Broker
  • ConsumerGet messages from a Topic through a Broker;

How do I use Kafka

First, this section describes how to build a Kafka cluster. We built a two-node cluster based on Docker-compose. Here is the detailed introduction document.

Set up the Kafka cluster

First write a docker-comemage.yml file:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: test:1:1
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
Copy the code

KAFKA_ADVERTISED_HOST_NAME needs to be replaced with your local IP address, not localhost 0.0.0.0. KAFKA_CREATE_TOPICS KAFKA_CREATE_TOPICS is intended to demonstrate that you can create some default topics when a Kafka cluster is started; Test :1:1 means that a Topic named test is created by default. The number of partitions and replicas is 1.

Docker-compose up -d –scale kafka=2 docker-compose up -d –scale kafka=2

➜ Kafka git:(master) docker-compose up-d --scale Kafka =2 Creating network "kafka_default" with the default driver Creating kafka_kafka_1 ... done Creating kafka_kafka_2 ... done Creating kafka_zookeeper_1 ... Done ➜ Kafka git:(master) docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES d5927ffbd582 Wurstmeister /kafka "start-kafka.sh" Less than a second ago Up 6 seconds 0.0.0.0:32774->9092/ TCP kafka_kafka_2 17916afee832 Wurstmeister /zookeeper "/bin/sh -c '/usr/sb..." Less than a second ago Up 7 seconds 22/tcp, 2888/tcp, 3888/tcp, Wurstmeister /kafka "start-kafka.sh" Less than a second ago Up 6 Seconds 0.0.0.0:32773-9092 / TCP kafka_kafka_1 >Copy the code

The Kafka cluster of two nodes has been successfully started. The container names of the nodes are kafka_kafka_1 and kafka_kafka_2 respectively.

Demonstrate production and consumption messages using Cli tools

Kafka comes with some CLI tools. You can go inside the container to access these commands:

➜ Kafka git:(master) docker exec -it kafka_kafka_1 bash bash-4.4# $KAFKA_HOME/bin/kafka-topics. Sh --describe --zookeeper  kafka_zookeeper_1:2181 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001Copy the code

The command above lists all the topics in the current Kafka cluster.

I personally prefer to access Kafka clusters directly from the host, which requires installing Kafka first, which can be done on macOS via Brew Install Kafka.

After the installation is complete, use the same method as above, such as listing all topics:

➜  Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
Copy the code

Let’s demonstrate how to produce and consume messages.

Create a new Topic:

➜ Kafka git:(master) kafka-topics --create --topic chat --partitions 3 --zookeeper localhost:2181 --replication-factor 2  Created topic "chat".Copy the code

The name of the new Topic is chat, the number of partitions is 3, and the number of replicas is 2. You can verify that the Topic was created successfully with the following command:

➜ Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181 Topic:chat PartitionCount:3 ReplicationFactor:2 Configs: Topic: chat Partition: 0 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002 Topic: chat Partition: 1 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001 Topic: chat Partition: 2 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic:test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001Copy the code

Create producer and consumer processes

Both the production and consumption of messages need to know the Broker address, and the mapping port if accessed on a Docker host. We can obtain this by using the following command:

Then create message producers and consumers, respectively, with the following command:

kafka-console-producer --broker-list localhost:32773 --topic chat
kafka-console-consumer --bootstrap-server localhost:32773 --topic chat --from-beginning
Copy the code

If you enter the message in the producer, you can see the corresponding message output in the consumer, as shown below:

You can exit both processes with

.

This section describes the principles of file storage

Let’s review some previous information about Topic Chat:

Topic:chat PartitionCount:3 ReplicationFactor:2 Configs: Topic:chat Partition: 0 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002 Topic: chat Partition: 1 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001 Topic: chat Partition: 2 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002Copy the code

As you can see from the above, the node with ID 1001 (kafka_kafka_1) stores the Leader part of Partition 0 and Partitiont 2, as well as a backup of Partition 1.

Partition is distributed to multiple Kafka nodes according to the following algorithm:

  • Sort all N brokers and M partitions to be allocated;
  • Allocate the ith Partition to the (I mod N) th Broker.
  • Allocate the JTH copy of the ith Partition to the ((I + j) mod N) Broker.

Partition is a Partition

We can log in to the inside of node 1001 to see the corresponding file store:

➜ blog git:(hexo) qualify docker exec -it kafka_kafka_1 bash bash-4.4# CD /kafka/kafka-logs-578c02c01fd9/ bash-4.4# ls-d chat* chat-0 chat-1 chat-2Copy the code

Each Partition corresponds to a directory, and each directory contains an index file and a log file.

Bash - 4.4 # ls - lh chat - 0 total 16 - rw - r - r - 1 root root 10.0 M May 8 20:52 00000000000000000000 index - rw - r - r - 1 root Root May 8 77 before 00000000000000000000. The log - rw - r - r - 1 root root 10.0 M May 8 20:52 00000000000000000000 timeindex -rw-r--r-- 1 root root 10 May 8 20:52 00000000000000000001.snapshot -rw-r--r-- 1 root root 8 May 8 20:35 leader-epoch-checkpointCopy the code

The log file stores the actual message content, while the index file with the same name stores the index data of the message. The log file name stores the offset value of the last message in the previous log file.

The message corresponding to the offset can be found as follows

  • First locate the corresponding segment; This can find the corresponding seinterfaces directly according to the file name binary search;
  • Find the position of offset in log file in sequence in segment index file. The index file is mapped to memory.

conclusion

Kafka provides better concurrency by assigning topics to multiple partitions on different nodes. In addition, the Replica number can be specified for Partition, which greatly improves the security of data storage and prevents data loss.

The design of locating messages based on file names is very clever!

At the beginning of this paper, I planned to explain it by designing a chat scene, in which the sender is the message producer and the receiver is the message consumer, and a corresponding Topic is generated for each user. Later, I thought the workload was a little too much, so I gave it up. You might want to implement this example when you want to learn about Go’s Kafaka SDK Sarama.

reference