Kafka itself was developed by LinkIn for logging, so its files are called logs
-
The difference between point-to-point pairings and publish subscriptions
1.1 Point-to-point mode in which a producer sends a message to a queue and only one consumer receives it.
1.2 Publish and subscribe Mode
When a publisher sends a message to a topic, only the subscribers who subscribe to the topic receive the message
Publish and subscribe is implemented in rabbitMQWhen RabbitMQ needs to support multiple subscriptions, messages sent by publishers are routed to multiple queues at the same time and consumed by different subscription groups.
Kafka publish subscribe model
Kafka only supports r message persistence. The consumer side is a pull model. The consumption state and subscription relationship are maintained by the client side. So when multiple subscriptions are supported, only one copy of the message is stored.
-
Kafka is a distributed, partitioned, multi-copy, multi-subscriber logging system (distributed MQ system) that can be used for Web/Nginx logging, search logging, monitoring logging, access logging, etc. Kafka is a distributed, partitioned, multi-copy, multi-subscriber logging system. Kafka currently supports multiple client languages: Java, python, c++, PHP, and more.
-
Kafka high throughput design Data disk persistence: Messages are not cached in memory and are written directly to disk, taking advantage of the sequential read and write performance of disk. Zero-copy: Reduces I/O operations. Supports batch data sending and pulling. Supports data compression. Topic is divided into multiple partitions to improve parallel processing capability.
-
Kafka Message storage Kafka messages are based on topics, which are independent of each other. Each topic can in turn be divided into several different partitions (specified when creating a topic), and each partition stores a portion of the Message. The relationship is shown below
Partitions are stored as files in the file system. For example, if you create a topic named T101 with five partitions, then there are four directories in Kafka’s data directory (specified by log.dirs) : T101-0, T101-1, T101-2, and T101-3 are named as
–and store data of the four partitions respectively. Create the following command: bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 4 –topic t101 New data is added to the end of the file, and this operation is always O(1) no matter how big the file data file is.
Messages for an offset are found sequentially. Therefore, lookups are inefficient if the data file is large.
To address the problem of inefficient lookups, Kafka uses segmentation and indexing
4.1 Data File Segmentation For example, if there are 100 messages, their offset is 0 to 99. Suppose the data file is divided into 5 segments, the first segment is 0-19, the second segment is 20-39, and so on. Each segment is placed in a separate data file named with the smallest offset in the segment. So, when looking for the Message with the offset specified, binary search can be used to determine which segment the Message is in.
Data file segmentation makes it possible to find messages corresponding to offsets in a smaller data file, but it still requires sequential scanning to find messages corresponding to offsets. To further improve the efficiency of the search, Kafka creates index files for the data files after each partition. The file name is the same as the data file name, but the file extension is.index. The index file contains several index entries, each representing the index of a Message in the data file. The index contains two parts, relative offset and position.
Since each data file does not start with a zero offset after segmentation, offset represents the size of the Message relative to the smallest offset in the data file to which it belongs. For example, if the offset of a segmented data file starts with 20, the relative offset of a Message whose offset is 25 in the index file is 25-20=5. Storing relative offset reduces the space occupied by index files. #position indicates the absolute position of the Message in the data file. Simply open the file and move the file pointer to the position to read the corresponding Message. The index file does not index every Message in the data file. Instead, sparse storage is used to build an index every certain byte of data. This prevents the index file from taking up too much space and keeps the index file in memory. The disadvantage is that messages that are not indexed cannot be located in the data file at once, which requires a sequential scan, but the range of the sequential scan is very small. 4.3 Finding the Message Schematic
1) The binary search is used to determine which LogSegment it is in, naturally in the first Segment. 2) Open the Segment index file and use binary lookup to find the largest index entry whose offset is less than or equal to the specified offset. The index with the natural offset of 6 is the one we are looking for. From the index file, we know that the position of Message with the offset of 6 in the data file is 9807. 3) Open the data file and scan from position 9807 until you find the Message with offset 7.
Kafka uses partition, LogSegment, and sparse index to store messages efficiently
- Zookeeper in Kafka
5.5, 5.6, and 5.7 are designed in the old version, and the offset of the new version does not exist in ZooKeeper.
5.1 Managing broker Clusters
Brokers are distributed and independent of each other, but a registry system is required to manage brokers across the cluster.
Zookeeper has a node dedicated to Broker server list logging:
/brokers/ids
Each broker is registered with ZooKeeper when it is started.
Kafka uses globally unique numbers to refer to each Broker server. Different brokers must be registered with different Broker IDS. Once a node is created, each Broker records its IP address and port information in the node. The type of node created by the Broker is a temporary node. If the Broker fails, the corresponding temporary node will be deleted automatically.
5.2 Managing Topic Information
In Kafka, messages on the same Topic are divided into multiple partitions and distributed among multiple brokers. The partition information and the corresponding relationship between the partitions and brokers are maintained by Zookeeper and recorded by special nodes, such as:
/borkers/topics
Each Topic in Kafka is logged as /brokers/ Topics /[Topic]. Once the Broker server is started, it registers its own Brokerid on the corresponding Topic node (/brokers/ Topics) and writes the total number of partitions for that Topic. Again, the partition node is a temporary node.
5.3 Producer load Balancer
The same Topic message can be partitioned and distributed among multiple brokers. Therefore, producers need to send messages to these distributed brokers properly. How to implement load balancing among producers?
(1) Four-layer load balancing, which determines an associated Broker for producers according to their IP addresses and ports. Typically, a producer corresponds to a single Broker, and messages generated by that producer are sent to that Broker. This approach is logically simple. Each producer does not need to establish additional TCP connections with other systems, but only needs to maintain a single TCP connection with the Broker. However, it can not achieve true load balancing, because in the actual system, the amount of messages generated by each producer and the amount of messages stored by each Broker are different. If some producers generate far more messages than others, the total number of messages received by different brokers will be greatly different. Producers are also not aware of Broker additions and deletions in real time.
(2) Zookeeper is used for load balancing. Since every Broker is started and the Broker registration process is completed, producers will dynamically perceive the change of the Broker server list through the change of this node to achieve a dynamic load balancing mechanism.
Kafka producers register Watcher monitors on ZooKeeper for events such as “Broker additions and decreases,” “Topic additions and decreases,” and “Broker and Topic association changes.
ZooKeeper’s Watcher notifications enable producers to dynamically capture Broker and Topic changes.
5.4 Consumer load balancing
Similar to producers, consumers also need to load balancing in Kafka to implement multiple consumers rationally from the corresponding Broker server receives the message, each group contains a number of consumers, consumers will only send each message grouping of a consumer, different consumer groups consumption of their specific Topic message below.
For each Consumer Group, Kafka assigns a globally unique Group ID that is shared by all consumers in the Group. At the same time, Kafka assigns each Consumer a Consumer ID, usually in the form of “Hostname:UUID”. In Kafka, each message partition can only be consumed by one Consumer in the same group. Therefore, the relationship between the message partition and the Consumer needs to be recorded on Zookeeper. Once each Consumer determines the consumption right of a message partition, Its Consumer ID needs to be written to the temporary node of the corresponding Zookeeper message partition, for example: /consumers/[group_id]/owners/[topic]/[brokerid-partition_id]
5.6 Recording message Consumption Progress Offset When consumers consume messages from the specified message partition, they need to record the consumption progress Offset periodically on Zookeeper. After the consumer restarts or other consumers take over the consumption of messages from the message partition, The ability to continue message consumption from the previous progress. Offset is recorded in Zookeeper by a dedicated node whose path is: /consumers/[group_id]/offsets/[Topic]/[brokerid-partition_id] The contents of the node are the values of Offset
-
In the new version, the consumption shift information is stored in zooKeeper, but zooKeeper is not suitable for frequent write query operations, so the consumption shift information in the new version is stored in the __consumer_offsets built-in topic. To create consumers group information, run the following command: group consumer_offsets_t105 bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic t105 –from-beginning –group consumer_offsets_t105
Query consumer_offsets default partition 50 from __consumer_offsets topic where consumer_offsets is stored. Math.abs(” consumer_offsetS_t105 “.hashcode ()) % 50.
It can be calculated that the displacement offset exists on the partition with partitionId equal to 44.
You can run commands to query the offset information of messages.
bin/kafka-console-consumer.sh –topic __consumer_offsets –partition 44 –bootstrap-server localhost:9092 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” — from-beginning
You can query the information about message publishing by running commands
bin/kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list localhost:9092 –topic t105 –time -1
You can see that the message offset is basically consistent with the published data of the message.
-
Kafka Java calls 7.1 producers
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers"."localhost:9092"); properties.put("acks"."all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("t105", msg)); System.out.println("Sent:"+ msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); }}}Copy the code
7.2 consumersCopy the code
import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", "group4"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "none"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("t105")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll( Duration.ofMillis(100) ); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition = "+ record.partition() +" offset = %d, value = %s", record.offset(), record.value()); System.out.println(); The}}}} parameter auto-.offset. reset has three values: latest, earliest, and None. < BR > Earliest: Automatically reset the offset to the painting < BR > latest: automatically reset the offset to the latest offset<br> none: hrow exception to the consumer if no previous offset is found for the consumer's groupCopy the code
- Kafka zero copy
Traditional I/O mechanism
This process actually takes place four times. The file data is first read into the kernel-state Buffer (DMA copy) through a system call. The application then reads the memory-state Buffer data into the user-state Buffer (CPU copy). The user program then copies the user-mode Buffer data to the kernel-mode Buffer (CPU copy) when sending data through the Socket, and finally copies the data to the NIC Buffer through DMA copy. This is accompanied by four context switches.
- If the leader fails,Kafka dynamically maintains a set of in-sync replicas. ISR for short,ISR has F +1 nodes, which can prevent message loss and provide services normally when F nodes go down. The membership of an ISR is dynamic, and if a node becomes obsolete, it can rejoin the ISR when it reaches the “synchronizing” state. So if the leader goes down, just choose a follower from the ISR.
- A Stream processor receives data upstream from its topology and processes the data using the basic Stream processing methods provided by Kafka Streams, such as map(), filter(), Join (), and aggregate. One or more of the processed output results are then sent to the downstream stream processor.
Kafka stream examples can be found at juejin.cn/post/684490…
- Kafka compression
Sh –create –bootstrap-server localhost:9092 –replication-factor 3 — Partitions 1 –topic t106
describe topics bin/kafka-topics.sh –describe –bootstrap-server localhost:9092 –topic t106
Bin /kafka-console-producer.sh –broker-list localhost:9092 –topic t106
Bin /kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic t106
Verify whether the message production success bin/kafka – run – class. Sh kafka. View GetOffsetShell – broker – list localhost: 9092 – topic t105 – time – 1
— — — — –
Bin /kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic T105 –from-beginning –group consumer_offsets_t105
Bin /kafka-console-consumer.sh –topic __consumer_offsets –partition 44 –bootstrap-server localhost:9092 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” — from-beginning
Bin /kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic t105 –group consumer_offsets_t105 –consumer.config config/consumer.properties