1. What is Kakfa?
Kafka is a distributed publish/subscribe Message Queue.
Why Kafka
- Peak clipping
Such as double eleven zero point taobao data volume increased sharply, the server will bear great pressure. We cannot allocate resources according to the amount of resources needed when the data volume is at its maximum. Otherwise, the data volume is very small at any time except at a certain period or moment, which will waste a lot of resources.
- Asynchronous processing
Instead of waiting for all the work to be done, we can do what is necessary to achieve a responsive user. Often tasks like sending a user manual by email can be done later.
- The decoupling
In order to reduce the interdependence of multiple systems and avoid the change of one system causing the change of other systems, message-oriented middleware can be used to achieve this goal. Coupling refers to the dependencies between systems, such as system A’s dependence on an interface provided by system B. After using message-oriented middleware as a mediator, the dependencies of the caller and the called change as follows:
A. The caller does not need to know the interface name and parameters of the called party, but only needs to send data to the message middleware.
B. Errors in system parameters or process hangings will not be caused by the caller.
That is, the dependencies between caller and caller are replaced by dependencies between caller and message-oriented middleware, message-oriented middleware and caller. However, the interface of message-oriented middleware usually does not change and the interface is uniform, so the coupling degree is reduced.
Usage scenarios
-
Messaging systems: Both Kafka and traditional messaging systems (also known as messaging middleware) have features such as system decoupling, redundant storage, traffic peak-cutting, buffering, asynchronous communication, scalability, and recoverability. At the same time, Kafka also provides guarantees for message ordering and backtracking consumption that are difficult to achieve in most messaging systems.
-
Storage systems: Kafka persists messages to disk, effectively reducing the risk of data loss compared to other memory-based storage systems. Thanks to Its message persistence and multi-copy mechanism, Kafka can be used as a long-term data storage system by setting the data retention policy to “permanent” or enabling log compression for topics.
-
Streaming platform: Kafka not only provides a reliable source of data for every popular streaming framework, but also provides a complete library of streaming classes, such as Windows, joins, transforms, and aggregations.
2. Kakfa Architecture
The Kafka architecture consists of producers, brokers, consumers, and a ZooKeeper cluster.
-
Producer: a message Producer, which is the client that sends messages to kafka Broker.
-
Consumer: message consumers, clients that fetch messages from Kafka Broker;
-
Consumer Group (CG) : A Consumer Group consisting of multiple consumers. Each consumer in the consumer group is responsible for consuming data of different partitions, and a partition can only be consumed by one consumer in the group; Consumer groups do not influence each other. All consumers belong to a consumer group, that is, a consumer group is logically a subscriber.
-
Broker: A Kafka server is a Broker. A cluster consists of multiple brokers. A single broker can hold multiple topics.
-
Topic: can be understood as a queue, producers and consumers are oriented to the same Topic;
-
Partition: For scalability, a very large topic can be spread across multiple brokers (servers). A topic can be divided into multiple partitions, each of which is an ordered queue.
-
Up: Replicas. Kafka provides replicas to ensure that data on partitions on a node in a cluster is not lost and kafka can continue to work. Each partition in a topic has several replicas, one leader, and several followers.
-
Leader: The leader is the “master” of multiple copies per partition, the object for which the producer sends data, and the object for which the consumer consumes data.
-
Follower: synchronizes data from the leader in real time in multiple copies in each partition. When the leader fails, a follower becomes the new follower.
Messages in Kafka are grouped by TOPIC, producers are responsible for sending messages to specific topics (each message sent to a Kafka cluster specifies a TOPIC), and consumers are responsible for subscribing to and consuming the topics.
A Topic is a logical concept that can be subdivided into multiple partitions, one belonging to a single Topic, or sometimes referred to as a topic-partition.
Kafka uses the Replica mechanism for zones to increase disaster recovery capability. Different replicas in the same partition store the same messages (the replicas are not identical at the same time). There is a “one master, many slave” relationship between replicas. The leader copy processes read and write requests, while the follower copy only synchronizes messages with the leader copy. If the leader copy is faulty, a new leader copy is elected from the follower copy to provide services externally.
Kakfa log file storage design
Messages in Kafka are grouped based on topics, which are logically independent of each other. Each topic can be divided into one or more partitions. Regardless of multiple copies, one partition corresponds to one Log (Log). In order to prevent the Log from becoming too large, Kafka introduces the concept of Log segment. Log segments are divided into multiple Log segments, which is equivalent to a large file divided evenly into several relatively small files.
Logs and logsegments are also not purely physical concepts. Logs are physically stored only as folders, whereas each LogSegment corresponds to a Log file and two index files on disk, and possibly other files. The.index file stores a large amount of index information, and the.log file stores a large amount of data. The metadata in the index file points to the physical offset address of Message in the corresponding data file.
The log file stores data produced by producer. Data produced by the Producer is continuously appended to the end of the log file, and each data has its own offset. Each consumer in the consumer group will record in real time which offset they consumed, so that in case of error recovery, they can continue to consume from the last position.
Message lookup procedure
First locate the. Index file corresponding to the target offset, then obtain the physical offset address stored in the corresponding offset (dichotomy), and then search for the message data stored in the corresponding. Log file according to the address, as shown in the figure below:
Each log fragment file corresponds to two index files, which are mainly used to improve the efficiency of finding messages.
- Offset index file
.index
It is used to establish the mapping relationship between message offset and physical address, so as to quickly locate the physical file location of the message (seek()
Methods). - Timestamp index file
.timeindex
Then find the corresponding offset information according to the specified timestamp (timestamp)offsetsForTimes()
Methods).
Log delete
Log Deletion (Log Retention) : Deletes invalid Log segments based on certain Retention policies.
- Based on time, the default retention period of log segmentation files is 7 days.
- The default value is 1073741824, or 1GB, based on the log size.
- Based on the log start offset.
Log compression
Log Compaction: Consolidates the key of each message, leaving only the last version of the message when different values have the same key.
4. Kakfa producers
(1) Overall structure of Kafka producer client
The entire producer client is run in coordination with two threads, the master thread and the Sender thread.
The messages are created by KafkaProducer in the main thread and then cached in a RecordAccumulator (also known as a message collector) through possible interceptors, serializers, and dividers. The Sender thread is responsible for getting the message from RecordAccumulator and sending it to Kafka.
- RecordAccumulator
RecordAccumulator is used to cache messages so that the Sender thread can send them in bulk, thereby reducing the resource consumption of network transmission and improving performance.
Each message sent from the main thread is appended to a dual – ended queue (Deque) for RecordAccumulator. A dual – ended queue is maintained for each partition.
When a message is written to the cache, it is appended to the end of a double-ended queue. When the Sender reads the message, it reads from the head of the two-ended queue.
Sender is an accumulator with < partition, Deque< ProducerBatch>> and <Node, List< ProducerBatch>. Node represents a Broker Node in a Kafka cluster.
Before KafkaProducer appends this message to the leader copy corresponding to a partition of a given topic, it first needs to know the number of partitions of the topic, and then calculates (or directly specifies) the target partition. KafkaProducer then needs to know the address and port of the broker node where the leader copy of the target partition is located to establish a connection and finally send messages to Kafka.
So a transition is required. For network connections, producer clients are connections to specific broker nodes, sending messages to specific broker nodes regardless of which partition the message belongs to.
- InFlightRequests
Requests are also saved to InFlightRequests before they are sent from the Sender thread to Kafka. InFlightRequests save objects in the form of Map<NodeId, Deque>, Its main function is to cache requests that have been sent but have not received a response (NodeId is a String representing the NodeId number).
- The interceptor
Producer interceptors can be used to either do some preparatory work before a message is sent, such as filtering unqualified messages according to a rule or modifying the content of the message, or to do some custom needs, such as statistical work, before sending the callback logic.
- The serializer
Producers need serializers to convert objects into byte arrays to send to Kafka over the network. On the other side, consumers need to use deserializers to convert the byte arrays they receive from Kafka into objects.
The serializer used by the producer and the deserializer used by the consumer need to be matched one by one. If the producer uses one serializer, such as StringSerializer, and the consumer uses another, such as IntegerSerializer, the desired data will not be parsed.
- Partition is
After the message is serialized, it needs to determine the partition to which it is sent. If a partition field is specified in the message ProducerRecord, there is no need for a partition because partition represents the partition number to which it is sent.
If no partition field is specified in the ProducerRecord message, you need to rely on the partition divider to calculate the value of the partition based on the key field. The purpose of a divider is to partition messages.
(2) Producer zoning strategy
Why do producers partition?
- It is easy to scale in a cluster. Each Partition can be adjusted to fit the machine on which it is located, and a topic can be composed of multiple partitions, so that the whole cluster can accommodate any size of data
- Can improve the concurrency, can read and write in the unit of Partition
Producer zoning principle
We need to encapsulate the data sent by producer into a ProducerRecord object:
- When partition is specified, the specified value is directly used as the Partiton value;
- If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value.
- In the case that there is neither a partition value nor a key value, the first call randomly generates an integer (incremented on this integer with each subsequent call) and modulates this value with the total number of partitions available for topic to obtain the partition value. It’s called the round Robin algorithm.
Kakfa consumers
(1) Consumer group
In Kafka, each consumer has a corresponding consumer group. When a message is published to a topic, it is delivered to only one consumer in each consumer group that subscribes to it. Each consumer can only consume messages in the partition to which it is assigned. Each partition can only be consumed by one consumer in a consumer group.As shown in the figure above, we can set up two consumer groups to implement the broadcast message function, and both consumer groups A and B can receive the message sent by the producer.
The model of consumers and consumer groups can make the overall spending power have horizontal scalability. We can increase (or decrease) the number of consumers to increase (or decrease) the overall spending power. For the fixed number of zones, blindly increasing the number of consumers will not improve the consumption ability all the time. If there are too many consumers, the number of consumers is greater than the number of zones, some consumers will not be allocated any zones.
As follows: there are 8 consumers with 7 partitions, so the final consumer C7 cannot consume any messages because there are no partitions allocated.
(2) Consumption mode
The consumer uses pull mode to read data from the broker.
The push pattern is difficult to adapt to consumers with different consumption rates, because message sending rates are determined by the broker. The goal is to deliver messages as quickly as possible, but it is easy for consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the consumer’s ability to consume.
The downside of the pull pattern is that if Kafka has no data, the consumer may get stuck in a loop that keeps returning empty data. For this reason, Kafka consumers pass in a timeout when consuming data. If no data is currently available for consumption, the consumer will wait a certain amount of time before returning.
(3) Partition allocation strategy at the consumer end
Kafka provide consumers the client parameters partition. The assignment, the strategy to set up the partition allocation between consumers and subscribe to the topic.
- RangeAssignor Allocation policy
By default, the RangeAssignor allocation strategy is used.
The principle of the RangeAssignor allocation strategy is to divide the total number of consumers by the total number of partitions to obtain a span, and then allocate the partitions evenly across the span to ensure that the partitions are distributed as evenly as possible to all consumers. For each topic, the RangeAssignor policy sorts all the consumers in the consumer group that subscribe to the topic in lexicographical order by name, and then allocates a fixed range of partitions for each consumer. If the distribution is not evenly distributed, the highest consumer in the lexicographical order is assigned an additional partition.
Suppose there are two consumers C0 and C1 in the consumer group, both subscribed to topics T0 and T1, and each topic has four partitions, then all the subscribed partitions can be identified as t0P0, T0P1, T0P2, T0P3, T1P0, T1P1, T1P2, and T1P3. The final distribution result is:
Consumer C0: T0P0, T0P1, T1P0, T1P1 Consumer C1: T0P2, T0P3, T1P2, T1P3Copy the code
Assuming that both topics in the above example have only three partitions, all subscribed partitions can be identified as t0P0, T0P1, T0P2, T1P0, T1P1, t1P2. The final distribution result is:
Consumer C0: T0P0, T0P1, T1P0, T1P1 Consumer C1: T0P2, T1P2Copy the code
It is clear that the distribution is uneven.
- RoundRobinAssignor Assignment policy
The principle of RoundRobinAssignor is to order the partitions of all consumers in a consumer group and all topics to which consumers subscribe lexicographically, and then assign the partitions to each consumer one by one through polling.
If all consumers in the same consumer group have the same subscription information, the RoundRobinAssignor allocation will be uniform.
If consumers in the same consumer group subscribe to different information, then the partitioning allocation is not a complete polling allocation when it is performed, which may result in uneven partitioning.
Suppose there are three consumers (C0, C1 and C2) in the consumer group, and t0, T0, T1 and T2 topics have 1, 2 and 3 partitions respectively, that is, the whole consumer group has subscribed to six partitions t0P0, T1P0, T1P1, T2P0, T2P1 and T2P2.
Specifically, consumer C0 subscribes to topics T0, consumer C1 subscribes to topics t0 and T1, and consumer C2 subscribes to topics t0, T1, and T2, so the final allocation result is:
Consumer C0: T0P0 Consumer C1: T1P0 Consumer C2: T1P1, T2P0, T2P1, t2P2Copy the code
As you can see, the RoundRobinAssignor strategy is not perfect either. This assignment is not optimal because it is perfectly possible to assign t1p1 to consumer C1.
- StickyAssignor Allocation policy
This allocation strategy has two main purposes:
Partitions should be distributed as evenly as possible.
Partitions are allocated as much as possible as they were last allocated.
Suppose there are three consumers (C0, C1, and C2) in a consumer group that subscribe to four topics (T0, T1, T2, t3) with two partitions for each topic. In other words, the entire consumer group subscribed to eight partitions t0P0, T0P1, T1P0, T1P1, T2P0, T2P1, T3P0, t3P1. The final distribution result is as follows:
Consumer C0: T0P0, T1P1, T3P0 Consumer C1: T0P1, T2P0, T3P1 Consumer C2: T1P0, T2P1Copy the code
Suppose that consumer C1 breaks away from the consumer group, then the distribution result is:
Consumer C0: T0P0, T1P1, T3P0, T2P0 Consumer C2: T1P0, T2P1, T0P1, T3P1Copy the code
StickyAssignor allocation policies, like the sticky in their name, ensure that the two assignments are the same as each other to reduce system resource consumption and other exceptions.
Rebalance
Rebalancing, the transfer of ownership of a partition from one consumer to another, ensures high availability and scalability for consumer groups, making it easy and safe to remove or add consumers to a consumer group.
disadvantages
- Consumers in the consumer group are not able to read messages during rebalancing.
- Rebalance is slow. If there are hundreds of Consumer instances in a Consumer group, Rebalance for hours at a time.
- In the process of rebalancing, the current state of the consumer will also be lost. For example, when a consumer consumes part of the message in a certain partition, the rebalancing operation takes place before he has time to submit the consumption shift. Later, this partition is allocated to another consumer in the consumption group, and the part of the message that has been consumed is consumed again, that is, repeated consumption occurs.
timing
- The number of group members has changed
- The number of subscribed topics has changed
- The number of partitions subscribed to the topic changed
The latter two are often the result of business changes, and we generally have no control over them, but try to estimate the number of topics and partitions required before the project starts so that subsequent rebalancing does not affect kafka usage. Here’s how to avoid making this Rebalance because of changes in the number of group members.
How do you avoid rebalancing
After the Consumer Group completes the Rebalance, each Consumer instance periodically sends a heartbeat request to a Coordinator to indicate that it is still alive. If a Consumer instance fails to send heartbeat requests ina timely way, the Coordinator will consider the Consumer “dead” and remove it from the Group. The Coordinator then makes a new Rebalance.
The default value is 10s. If a Coordinator does not receive a heartbeat from a Consumer instance in the Group within 10 seconds, it considers the Consumer instance to have been suspended.
On the Consumer side, you can also set heartbeat.interval.ms to indicate the frequency at which heartbeat requests are sent.
And the max.poll.interval.ms parameter, which limits the maximum interval between two calls to the poll method by the Consumer application. The default value is 5 minutes, which means that if your Consumer cannot consume the poll message within 5 minutes, the Consumer will initiate a “leave the group” request and the Coordinator will start a new round of Rebalance.
So knowing the above parameters, we can avoid the following two problems:
- Rebalance is unnecessary because the Consumer is “kicked” out of the Group because it fails to send a heartbeat in time.
So we can set it up like this in production:
Set session.timeout.ms to 6s.
Set heartbeat.interval.ms to 2s.
- Rebalance is what consumers do for too long. If max.poll.interval.ms is set to 5 minutes, then Rebalance as well, so adjust this parameter if you have heavy tasks.
- Frequent Full GC on the Consumer side causes long pauses that cause Rebalance.
(5) Maintenance of offset
Since a consumer may have power outages and other faults in the process of consumption, after recovery, it needs to continue to consume from the position before the fault, so it needs to record the offset to which it consumes in real time, so that it can continue to consume after recovery of the fault.
Before Kafka 0.9, consumers stored offsets in Zookeeper by default. Starting with 0.9, consumers stored offsets in a built-in Kafka topic by default. The topic is __consumer_offsets.
6. Broker
(1) The Broker processes the request process
In Kafka’s architecture, many clients send requests to the Broker. Kafka’s Broker has a SocketServer component that connects to the client and then dispatches the requests via Acceptor threads. Because Acceptor does not involve specific logical processing and is very lightweight, it has high throughput.
Acceptor threads then poll to evenly distribute incoming requests to all network threads. The default size of the network thread pool is three, meaning that each Broker starts with three network threads to handle requests sent by clients. This can be modified with the Broker side parameter num.network.threads.
(2) Controller
There are one or more brokers in a Kafka cluster. One broker is elected as a Kafka Controller, which is responsible for managing the state of all partitions and replicas in the cluster.
How is the controller selected?
When the Broker starts, it tries to create/Controller nodes in ZooKeeper. Kafka currently elects controllers as follows: The first Broker that successfully creates a /controller node is designated as the controller.
The/controller_EPOCH node in ZooKeeper holds an integer value for Controller_EPOCH. Controller_epoch Is used to record the number of times a controller has changed. That is, the generation of the current controller is known as the epoch of the controller.
Controller_epoch starts with a value of 1, that is, the epoch of the first controller in the cluster, which is incrementing by 1 each time a new controller is selected when the controller changes. Kafka uses controller_epoch to ensure that controllers are unique and therefore consistent.
Each request that interacts with the controller will carry the controller_EPOCH field. If the controller_EPOCH value in the request is less than the controller_EPOCH value in memory, the request is considered to be a request to an expired controller. The request will be deemed invalid.
If the requested Controller_EPOCH value is greater than the controller_EPOCH value in memory, a new controller has been elected.
What does the controller do?
-
Topic management (Create, delete, add partitions)
-
Partition redistribution
-
Preferred Leader Election
The Preferred Leader election is a solution provided by Kafka to avoid overload of some brokers.
- Cluster member management (new Broker, active Broker shutdown, Broker down)
The controller component uses the Watch mechanism to check for changes in the number of child nodes under the ZooKeeper /brokers/ IDS node. Currently, when a new Broker is started, it creates its own ZNode node under /brokers. Once created, ZooKeeper pushes notifications to the controller through the Watch mechanism, which automatically senses the change and starts subsequent new Broker jobs.
- Data services
The most complete cluster metadata information is saved on the controller.
What if the controller is down?
Kafka can quickly sense when a running controller suddenly goes down or terminates unexpectedly and immediately starts a backup controller to replace the previously failed controller. This process is called a Failover and is done automatically without manual intervention.
7. Guarantee of high availability in Kafka
(1) the ISR
All Replicas in a partition are called AR (Assigned Replicas). All in-Sync Replicas (including the leader Replicas) that keep a certain degree of synchronization with the Leader Replicas constitute in-Sync Replicas (ISRs). ISR sets are a subset of AR sets.
Out-of-sync Replicas (OSR) consist of Replicas that lag too much when synchronizing with the leader Replicas (excluding the leader Replicas). AR=ISR+OSR. Under normal circumstances, all follower replicas should be in sync with the leader replicas to a certain extent, that is, AR=ISR and the OSR set is empty.
The Leader replica is responsible for maintaining and tracking the lagging state of all follower copies in the ISR set. When the follower replica falls too far behind or fails, the Leader replica removes it from the ISR set. By default, when the Leader replica fails, only the replica in the ISR collection is eligible to be elected as the new Leader.
(2) ACK mechanism
In order to ensure that the data sent by producer can be reliably sent to the designated topic, every partition of a topic needs to send ack (Acknowledgement) to producer after receiving the data sent by producer. If the producer receives an ACK, it sends the next round, or resends the data otherwise.
Configuring acks Parameters
-
0: The producer does not wait for the broker’s ACK. This operation provides a minimum delay for the broker to return a received message before it has been written to disk. Data may be lost if the broker fails.
-
1: The producer waits for the ACK from the broker, and the partition’s leader returns an ACK after falling down. If the leader fails before the follower synchronization succeeds, data will be lost.
-
-1 (all) : The producer returns an ACK only after the broker and the partition’s leader and follower fall successfully. However, if the leader fails after the follower synchronization is complete but before the broker sends an ACK, data duplication can occur.
(3) the HW
HW, short for High Watermark, identifies a specific message offset, and consumers can only pull messages before that offset. LEO is short for Log End Offset, which identifies the Offset of the next message to be written in the current Log file.
As shown in the figure above, the first message has an offset (LogStartOffset) of 0, and the last message has an offset of 8. The message whose offset is 9 is represented by a dotted box and represents the next message to be written. The HW of the log file is 6, which means that the consumer can only pull messages with offset between 0 and 5, and messages with offset 6 are invisible to the consumer.
- Followers of failure
If a follower is faulty, the follower is temporarily kicked out of the ISR. After the follower recovers, the follower reads the last HW recorded on the local disk, intercepts the log file that is higher than the HW, and synchronizes data from the HW to the leader. After the follower’s LEO is greater than or equal to the Partition’s HW, that is, after the follower catches up with the leader, the follower can join the ISR again.
- Leader failure
After the leader becomes faulty, a new leader is selected from the ISR. To ensure data consistency among multiple copies, the remaining followers cut off the log files whose values are higher than HW and then synchronize data from the new leader.
HW only ensures data consistency between replicas, but does not guarantee data loss or duplication.
(4) Exactly Once
Setting the ACK level of the Server to -1 ensures that data will not be lost between Producer and Server, namely, AtLeast Once semantics. By contrast, setting the server ACK level to 0 ensures that the producer will send each message only Once (i.e. At Most Once).
At Least Once ensures that data is not lost, but not repeated. At Least Once, data cannot be lost. However, for some very important information, such as transaction data, downstream data consumers require that data be neither duplicated nor lost, the Exactly Once semantics.
Prior to version 0.11, there was nothing Kafka could do about it except to ensure that data was not lost and that downstream consumers were globally de-duplicated. In the case of multiple downstream applications, each requires separate global de-weighting, which can have a significant impact on performance. Kafka 0.11 introduced a major feature: idempotence. Idempotent means that no matter how many times Producer sends repeated data to the Server, the Server persists only one. Idempotence combined with the At Least Once meaning forms the Exactly Once meaning of Kafka. That is:
At Least Once = Exactly OnceCopy the code
To enable idempotency, simply set enable.idompotence to true in the Producer argument. The idempotent implementation of Kafka essentially reloads the data upstream of what was previously required downstream.
A Producer that enables idempotent is assigned a PID during initialization, and messages sent to the same Partition carry Sequence numbers. The Broker caches
However, PID will change after restart, and different partitions also have different primary keys, so idempotent cannot guarantee Exactly Once across partitions and sessions.
(5) Leader Epoch
Leader epoch represents the epoch information of the leader, with an initial value of 0. Each time the Leader changes, the value of the Leader epoch increases by 1, which is equivalent to adding a version number to the Leader. This value can be used to determine whether the current leader is the latest in subsequent replicas during synchronization, preventing incorrect data from being synchronized.
8. Kafka transactions
Transactions in Kafka enable applications to treat consumption messages, production messages, and commit consumption shifts as atomic operations, succeeding or failing simultaneously, even if the production or consumption spans multiple partitions.
The producer must provide a unique transactionalId and request the transaction coordinator to obtain a PID after startup. The transactionalId corresponds to the PID one to one.
Every time to send data to the < Topic, Partition > ago, need to be the first to send AddPartitionsToTxnRequest transaction coordinator, The Transaction coordinator stores the
After the AddOffsetsToTxnRequest is processed, the producer sends a TxnOffsetCommitRequest request to the GroupCoordinator. The consumption shift information offsets contained in this transaction are stored in the theme __consumer_offsets
Once the data writing operation is complete, the application must call either KafkaProducer’s commitTransaction method or abortTransaction method to terminate the current transaction.
9. Why is Kafka so fast
(1) Write disks in sequence
Kafka’s producer produces data that is written to a log file. The process is written sequentially to the end of the file. Data from the official website shows that the same disk, sequential write can reach 600M/s, while random write only 100K/s. This has to do with the mechanics of the disk, and sequential writing is fast because it saves a lot of head addressing time.
(2) Zero copy technology
Copying data directly from a disk file to a network adapter device without going through an application. Zero copy greatly improves application performance, reducing context switching between kernel and user mode.
(3) Memory Mapped Files
MMF (Memory Mapped Files) directly uses the Page of the operating system to map Files to physical Memory. After that, operations on physical Memory are synchronized directly to hard disk. MMF greatly improves I/O rate through memory mapping, eliminating the need to copy user space to kernel space. Kafka provides the product. type parameter to control whether or not to flush actively. If Kafka writes to the MMF and then flush back to the producer, Kafka is in synchronous mode, and if Kafka writes to the MMF immediately and then returns to the producer, it is in asynchronous mode.
Kafka provides an argument to control whether producer.type is active Flush:
-
If Kafka writes to the MMF, it Flush immediately and then returns the Producer as Sync.
-
If Kafka writes to MMF and immediately returns Producer without calling Flush, it is called Async.
(4) Batch delivery
Kafka allows you to send messages in batches. When producter sends a message, it caches it locally and sends it to Kafka when certain conditions are met.
- Wait for the number of messages to reach a fixed number.
- Send them once in a while.
(5) Data compression
Kafka also supports compression of message sets. Producer can compress message sets using GZIP or Snappy formats. The advantage of compression is to reduce the amount of data transferred and reduce the pressure on network transmission.
After Producer compression, Consumer needs to decompress. Although the work of CPU is increased, the bottleneck of big data processing is on the network rather than the CPU, so the cost is worth it.
(6) Partition
Kafka is a distributed clustered system that can contain multiple brokers, or server instances. Each topic has multiple partitions, and Kafka distributes partitions evenly across the cluster. When producers send messages to the topic, the messages are distributed to different partitions through load-balancing mechanisms to reduce the stress on a single server instance.
There can be multiple consumers in a Consumer Group, and multiple consumers can consume messages of different partitions at the same time, which greatly improves the parallel consumption ability of consumers. However, messages in a partition can only be consumed by one Consumer in a Consumer Group.
conclusion
Kafka converts all messages into a batch file and performs reasonable batch compression, reducing network I/O losses, and increasing I/O speed through Mmap. When writing data, the speed is optimal because a single Partion is added at the end. Read data with Sendfile direct violence output.
Kafka’s page cache
Kafka’s messages are stored in OS Pagecache (pagecache, a pagecache of one size, usually 4K, used to cache the logical contents of a file when Linux reads or writes files, thus speeding up access to disk images and data).
Page caching is a major disk cache implemented by operating systems to reduce disk I/O operations. To be specific, data on the disk is cached to the memory, and the access to the disk is changed to the access to the memory.
When a process attempts to read the contents of a file on the disk, the operating system checks whether the page where the data to be read resides is in the pagecache. If the page exists, the system returns the data directly, avoiding I/O operations on the physical disk. If there is no hit, the operating system makes a read request to disk and stores the read pages into the page cache, which then returns the data to the process. Similarly, if a process needs to write data to disk, the operating system checks whether the corresponding page is in the page cache. If not, the corresponding page is added to the page cache first, and the data is written to the corresponding page. The modified page becomes a dirty page, and the operating system writes the data in the dirty page to disk at an appropriate time to maintain data consistency.
11. Kafka solution to message queuing common problems
(1) Repeated message consumption
Kafka repeated consumption scenarios
- Rebalance
A consumer was consuming a message on a partition. After the rebalance, another consumer was consuming the message again.
- Manual submission on the consumer side
If the message is consumed first and then the offset position is updated, the message will be consumed repeatedly.
- Automatic submission on the consumer side
Set offset to commit automatically. When kafka is shut down, if consumer.unsubscribe() is called before close, it is possible that part of the offset is not committed, and the next restart will consume again.
- The producer side
The producer outages due to service problems. Data may be retransmitted after the restart
- Automatic retry mechanism for sending messages
Network jitter, developer code bugs, and data problems can all fail and require resend messages
How to solve repeated consumption
- Kafka end
Version 0.11 of Kafka introduced idempotence. To enable idempotency, simply set enable.idompotence to true in the Producer argument. The idempotent implementation of Kafka essentially reloads the data upstream of what was previously required downstream.
A Producer that enables idempotent is assigned a PID during initialization, and messages sent to the same Partition carry Sequence numbers. The Broker caches
- The project end
Ensure that the same message is processed multiple times by a server-side interface or consumer that consumes the message. That is, the downstream message handler implements idempotency. Generally idempotent, it is necessary to consider different scenarios to determine whether strong or weak verification is needed. For example, strong verification is performed in scenes related to money, and weak verification is performed in scenes that are not very important.
Strong check:
Each time the message comes, the unique ID of the custom rule is used to determine whether the message has been processed. If the message has been processed, the message will be returned directly or skipped. If the message has not been processed, the subsequent logic will normally process the message.
Weak check:
For some unimportant scenes, such as sending a short message, I will take the id+ scene unique identifier as the Redis key and put it in the cache to set the expiration time. The message within a certain period of time will be judged by Redis. It doesn’t matter if the message is lost, it is acceptable to send it again.
(2) Message order consumption
Message ordering can be reflected through Kafka’s partitioning policy. Partitioning policies include polling policy, random policy, and order preservation policy by message key. The sequential consumption of messages can be realized by message key order preservation strategy.
By message key order preservation policy
Once a message is defined with a Key, you can ensure that all messages with the same Key go into the same partition. This strategy is called “by message Key” because messages are processed sequentially in each partition.
The same Key is hashed into the same partition (as shown in the following figure). Therefore, as long as the Key of the message is the same, the batch of messages can be consumed sequentially.
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Copy the code
(3) Message loss
Message loss scenario
- Automatically submit
Set offset to automatic scheduled submission. When the offset is automatically scheduled submission, the data is still unprocessed in the memory. At this time, the thread is killed.
- Producer sends message
Sending messages is set to fire-and-forget, which simply sends messages to Kafka without caring if they arrive correctly. However, at some point, such as when a non-retried exception occurs, messages can be lost. This mode has the highest performance and the lowest reliability.
- The consumer
The shift was committed first, but the message crashed before it was consumed, causing the message not to be consumed. The same goes for automatic displacement submissions
- Acks is not set to all
If the broker goes down before it synchronizes messages to other brokers, messages will be lost
How to resolve message loss
There are three phases: production messages, storage messages, and consumption messages. Take a look at each of the three phases to ensure that messages are not lost.
- Production of the message
The producer sends a message to the Broker and needs to process the Broker’s response. Whether the message is sent synchronously or asynchronously, both synchronous and asynchronous callbacks need to do a try-catch and handle the response properly. If the Broker returns an error message, such as a write failure, it needs to retry sending. When multiple sending failures need to alarm, log and so on.
- Store messages
The message storage phase needs to give the response to the producer after the message is flushed. If the message is written to the cache and the response is returned, the machine suddenly loses power and the producer thinks it has been sent successfully.
If brokers are clustered, there is a multi-copy mechanism, where messages need to be written not only to the current Broker, but also to the secondary machine. That is configured to write to at least two machines before giving the producer a response. This basically guarantees reliable storage.
- News consumption
Ensuring that the consumer actually executes the business logic before sending the Broker a successful purchase is the real purchase. Prevents messages from being lost in the middle of a consumption when the Broker thinks it has been successfully consumed.
conclusion
As can be seen, to ensure the reliability of the message needs the cooperation of three parties.
The producer needs to handle the Broker’s response, using retries and alarms in case of an error.
The Broker needs to control the timing of the response. In the case of a single machine, the message will be flooded and in the case of a cluster with multiple replicas, the message will be sent to two or more replicas.
The consumer needs to return the response to the Broker after the actual business logic has been executed.
However, it is important to note that as message reliability increases, performance degrades. Waiting for messages to flush and returning after multiple copy synchronization can affect performance. Therefore, it depends on the business, for example, log transmission may lose one or two, so it does not matter, so there is no need to wait for messages to flush before responding.
(4) Message accumulation problem
The accumulation of news is often caused by a mismatch between the pace of production by producers and the pace of consumption by consumers. It could be that message consumption failed and retried, or it could be that consumers were weak and messages accumulated over time.
So we need to first locate the cause of the slow consumption, if it is a bug with bugs, if because itself consumption ability is weak, we can optimize the consumer logic, such as before is a processing of a message consumption, this time we batch processing, such as database insert, one at a plug and batch plug efficiency are not the same.
If we have optimized the logic, but it is still slow, then we have to consider the horizontal expansion, increase the number of Topic queues and consumers, pay attention to the number of queues must be increased, or the newly added consumers will have nothing to consume. Within a Topic, a queue is assigned to only one consumer.
Of course, whether you’re a single threaded consumer or a multi-threaded consumer depends on the situation. If you write a message to an in-memory queue and then return a response to the Broker, then multiple threads consume messages to the in-memory queue. If the consumer is down, unconsumed messages in the in-memory queue will be lost.
Kafka FAQ
(1) Is it true that “if the number of consumers in the consumption group exceeds the partition of topic, some consumers will fail to consume data”? How to solve it?
Generally speaking, if there are too many consumers and the number of consumers is greater than the number of partitions, there will be consumers who cannot allocate any partitions.
Developers can inherit AbstractPartitionAssignor implement custom consumption strategy, so as to realize the same consumption of any consumer can be in the group subscription theme of all partitions.
(2) When the consumer submits the consumption displacement, does it submit the offset of the latest message currently consumed or the offset+1?
The current consumer needs to submit a consumption offset+1.
In the old consumer client, the consumption shift was stored in ZooKeeper. In the new consumer client, the consumption shift is stored in the __consumer_offsets theme inside Kafka.
(3) KafkaConsumer is not thread safe, so how to implement multithreaded consumption?
- Thread closure, which means that a KafkaConsumer object is instantiated for each thread. Each thread corresponds to a KafkaConsumer instance, which we can call a consuming thread. A consuming thread can consume messages in one or more partitions, and all consuming threads belong to the same consuming group.
- Consumer programs use single or multiple threads to get messages and create multiple consumer threads simultaneously to perform message processing logic.
(4) Can the number of Topic partitions be increased or decreased at will?
- Can increase
When the number of partitions increases, all groups that subscribe to the topic Rebalance.
First, the Rebalance process has a huge impact on the Consumer Group’s consumption process. During the Rebalance, all Consumer instances stop consuming and wait for the Rebalance to complete. This is one of the worst aspects of Rebalance. Second, the current design of Rebalance is that all Consumer instances participate together, reassigning all partitions. It would be more efficient to minimize changes in the allocation scheme. Finally, Rebalance is too slow.
- Partition reduction is not supported
Because messages in the deleted partition are difficult to process.
If stored directly to the end of an existing partition, the timestamp of the message is not incremented, which will affect components such as Spark and Flink that require message timestamps (event times). If the existing partitions are inserted separately, internal data replication can be very resource-intensive during high message volumes, and how can the availability of this topic be guaranteed during replication? At the same time, sequential issues, transactional issues, and state machine switching between partitions and replicas have to be addressed.
(5) What internal topics Kafka currently has, and what are their features? What is the role of each?
__consumer_offsets
: Saves the displacement information of Kafka consumers__transaction_state
: used to store transaction log messages
(6) What is the priority copy? What special function does it have?
A priority copy is the first copy in the AR collection list.
Ideally, the preferred replica is the leader replica of the partition, so it can also be called a preferred Leader. Kafka ensures that priority copies of all topics are evenly distributed across the Kafka cluster, which ensures that leaders are evenly distributed across all partitions. This promotes load balancing of the cluster, which can also be called “partition balancing.”
(7) Why does Kafka not support read/write separation?
- Data consistency issues
There must be a delayed time window when data is transferred from the master node to the slave node. This time window will cause data inconsistency between the master and slave nodes.
- Time delay problem
The process of data writing from master node to synchronization from slave node goes through network → master node memory → master node disk → network → slave node memory → slave node disk. The master write/slave read function is not suitable for delay-sensitive applications.
For Kafka, this is not necessary because in a Kafka cluster, if there are multiple replicas, the leader replicas can be properly configured so that the read and write load is equal across all brokers.
(8) What is the role of Zookeeper in Kafka?
A single broker in a Kafka cluster is elected as a Controller, which manages the up-down of the cluster broker, the allocation of partition copies to all topics, and the election of the leader. Controller management is dependent on Zookeeper.