• The core value of message queues -Copy the code

Solutions of the coupling.

Asynchronous processing such as e-commerce platforms, seckilling activities. The general process can be divided into: 1: risk control, 2: inventory lock, 3: order generation, 4: SMS notification, 5: data update.

The second kill activities are separated through the message system, and the businesses that do not need to be dealt with are put behind and processed slowly. The process was changed to: 1: risk control, 2: inventory lock, 3: message system, 4: order generation, 5: SMS notification, 6: update data.

Traffic control 1. After receiving a request, the gateway places it in a message queue 2. The back-end service retrieves the request from the message queue and completes the subsequent kill process. The result is then returned to the user. Advantage: Controls the flow. Disadvantage: Slows down the flow.

The picture

  • Kafka core concept -Copy the code

Consumers fetch data from Kafka, process data, and consume data. Kafka’s data is pulled and pulled by the consumers themselves. Partition By default, a topic has one partition. You can configure multiple partitions (partitions are distributed on different nodes of the server).

The picture

  • Cluster Architecture -Copy the code

A Consumer Group is a Consumer Group in a Kafka cluster. A partition is a Consumer Group in a Kafka cluster. When A consuming group consumes data, it must specify A group ID. If the group ID specified by program A is the same as the group ID specified by program B, the two programs belong to the same consuming group.

For example, if A topicA program A consumes topicA, then program B cannot consume topicA (program A and program B belong to the same consumer group). For example, program A has already consumed topicA data, but it is not allowed to consume topicA data again. However, after respecifying a group ID, it can be consumed. There was no effect between different consumer groups.

The consumer group needs to be customized, and the consumer name program automatically generates (unique). Controller: a master Controller of a Kafka node.

The picture

  • Data Performance -Copy the code

Kafka Writes data sequentially. When data is written to the disk, data is appended. There is no random write operation. Rule of thumb: If a server has a certain number of disks and a certain number of revolutions, sequential (apend) data is written to disks as fast as it can be written to memory. Producers produce messages that are written to OS cache memory through The Kafka service and then written to disk in sync sequence.

The picture

  • Zero copy data high performance -Copy the code

Process for consumers to read data:

The consumer sends a request to the Kafka service; The Kafka service reads data from the OS cache (if no cache is available, it reads data from disk). Read data from disk into OS cache; OS cache copies data to kafka applications; Kafka sends data (replication) to the socket cache. Socket cache is transmitted to the consumer through the network card. The picture

Kafka Linux Sendfile technology – Zero copy

1. The consumer sends a request to the Kafka service; 2. The kafka service reads data from the OS cache (if the cache does not exist, it reads data from disk). 3. Data is read from the disk to the OS cache. 4. The OS cache directly sends data to the network adapter. 5. Data is transmitted to consumers through the network card.

The picture

The picture

  • Kafka log fragment save -Copy the code

In Kafka, a theme is usually partitioned; For example, you create a topic_A and specify that the topic has three partitions. There are actually three directories created on three servers. Server 1 (kafka1) creates directory topic_A -0:.

Kafka data is called a message. The data is stored in a log file. Log. Data files are called log files in Kafka. By default, one partition has n log files (stored in segments), and one log file is 1 GB by default.

The picture

Server 2 (kafka2) : create directory Topic_A-1: Create directory topic_A-2 on server 3 (kafka3).

The picture

  • Binary search location data -Copy the code

Each message in Kafka has its own offset, which exists on the physical disk. In Kafka, each message has its own offset, which exists on the physical disk. Sparse index of physical disk location: Kafka uses sparse index to read indexes. Kafka writes a record index to index whenever a 4k log (.log) is written. Binary search will be used:

The picture

The picture

  • High concurrency network design NIO -Copy the code

Network design is the best part of Kafka design, which is to ensure that Kafka high concurrency, high performance reasons, kafka tuning, you have to understand the principle of Kafka, especially the network design part.

Reactor Network Design Pattern 1:

The picture

Reactor Network Design Pattern 2:

The picture

Reactor Network Design Pattern 3:

The picture

Kafka Ultra Concurrent Network Design:

The picture

The picture

The picture

  • Kafka redundant copies guarantee high availability -Copy the code

– Partitions have replicas in Kafka, note: before 0.8 there was no replicas. When creating a topic, you can specify partitions or the number of copies. A copy has the following roles: Leader Partition: 1. Data write and data read operations are performed by the Leader partition.

It maintains an IN-Sync-Replica (ISR) list, but deletes values from the ISR list according to certain rules. The producer sends a message to the leader partition. After the message is written to the leader partition, it writes it to other partitions in the ISR list. Write this message to the follower partition before it is counted: synchronize data from the leader partition.

The picture

  • Good architectural thinking -Copy the code

Kafka – High concurrency, high availability, and high performance High availability: Multi-copy mechanism High concurrency: Network architecture Design Three-tier architecture: Multi-selector -> Multi-thread -> Queue design (NIO) High performance: Write data: write data to OS Cache first and write data to disk sequentially, high performance

Read data: Based on sparse indexes, quickly locate the data to be consumed. Zero-copy mechanism Reduces data copying and reduces context switching between applications and the operating system

The picture

  • Kafka production environment setup -Copy the code

Requirement scenario analysis e-commerce platform, which requires 1 billion requests per day to be sent to Kafka cluster. 28 anyway, general assessment comes out the problem is not big. 1 billion requests -> 24. Generally speaking, there is not much data between 12:00 and 8:00 in the morning. 80% of requests are processed in another 16 hours. 16 hours to process -> 800 million requests. 16 * 0.2 = 3 hours to process 80% of 800 million requests.

That means 600 million data was processed in 3 hours. Let’s simply calculate the QPS at peak time 600 million /3 hours = 55,000 /s QPS = 55,000.

1 billion requests x 50kb = 46 TB 46 TB data needs to be stored every day.

46T * 2 = 92T Kafka data is reserved for the last 3 days. 92TB * 3 days = 276TB I’m talking about 50KB here, not that one message means 50KB no (merging logs, merging multiple logs together). Typically, a message is only a few bytes, but it can be a few hundred bytes.

The picture

  • Physical machine number evaluation -Copy the code

(1) First analyze whether we need virtual machines or physical machines, such as Kafka, mysql and Hadoop, when we build clusters, we use physical machines in production.

(2) The total number of requests to be processed at peak times is 55,000 per second. In fact, one or two physical machines are absolutely able to resist. In general, when we evaluate the machine, we evaluate it at four times the peak. If it is 4 times, about our cluster capacity should be prepared to 200,000 QPS. This is a more secure cluster. You need about five physical machines. Each takes 40,000 requests.

Scenario summary: 1 billion requests, 55,000 QPS at peak,276T of data, 5 physical machines are needed.

The picture

  • Disk selection -Copy the code

1 billion requests, peak 55,000 QPS,276T data, 5 physical machines.

(1) SOLID-state drives (SSDS), still need ordinary mechanical hard drives. SSDS have better performance but are expensive. SAS disks are cheaper but not as good in some aspects. SSDS have good performance, which means that they provide good random read and write performance. Suitable for clusters like MySQL. However, its sequential write performance is similar to that of SAS disks.

Kafka takes it in the order in which it is written. So we just use a normal [mechanical hard disk].

(2) We need to evaluate how many disks each server needs. Five servers need 276T in total, and each server needs to store 60T of data. The server in our company is configured with 11 hard disks, each of which is 7T. 11 times 7T is equal to 77T.

77 TB x 5 Servers = 385 TB.

Scenario summary: Five physical machines (11 SAS x 7 TB) are required to handle 1 billion requests.

The picture

  • Memory evaluation -Copy the code

To handle a billion requests, you need 5 physical machines, 11 (SAS) x 7 tons.

We find that kafka reads and writes data based on the OS cache. In other words, if we assume that OS Cashe is infinite, then kafka operates on memory. If kafka operates on memory, it will perform very well. Memory is finite.

(1) Allocate as much memory as possible to OS cache. (2) Kafka code with the core code is written in Scala, the client code written in Java. All based on the JVM. So we also have to give some memory to the JVM. Kafka is not designed to put a lot of data structures in the JVM. So our JVM doesn’t need much memory. As a rule of thumb, 10 grams should do the trick.

NameNode: The JVM also contains metadata (tens of gigabytes), so the JVM must give it a large amount. Let’s say I give you 100 grams.

Let’s say we have 10 requests for this project, and we have 100 topics. 100 topic * 5 partition * 2 = 1000 partition 100 topic * 5 partition * 2 = 1000 partition

The.log file is a data file. By default, a.log file is 1 GB in size. If we want to ensure that the latest.log file for 1000 partitions is in memory, this is the best time for performance. 1000 x 1 GB = 1000 GB memory. We just need to keep 25% of the current log in memory. 250M x 1000 = 0.25 G x 1000 =250 gb memory.

250 MEMORY / 5 = 50 GB memory 50 GB +10 GB = 60 GB memory.

64GB of ram, another 4G, does the OS also need memory? Kafka’s JVM doesn’t need to be as high as 10 GIGABytes. The assessment is that 64G is ok. Of course, if you can give 128GB of memory server, that is the best.

When I just evaluated, I used a topic with 5 partitions, but if it is a topic with a large amount of data, there may be 10 partitions.

Summary: 1 billion requests, 5 physical machines, 11 (SAS) x 7TB, 64GB of ram (128GB is better)

The picture

  • CPU pressure evaluation -Copy the code

Evaluate how many CPU cores are required per server (resources are limited).

We estimate how many cpus we need based on how many threads are running in our service. Threads rely on the CPU to run. If we have a lot of threads but a lot of CPU cores, then our machine will have a lot of load and performance will not be good.

Evaluate how many threads kafka will have once a server starts?

Acceptor thread 1 Processor thread 3 6 to 9 request processing threads 8 32 threads Periodic clearing threads, data pulling threads, periodic checking ISR list mechanism, etc. So when a Kafka service starts up, there are more than 100 threads.

CPU core = 4, say again, dozens of threads, you must put the CPU full. CPU core = 8, should easily support dozens of threads. If we have more than 100 threads, or something like 200, then we can’t handle eight CPU cores. So we recommend here: CPU core = 16. It would be nice to have 32 CPU cores if possible.

Bottom line: A Minimum of 16 CPU cores for A Kafka cluster, preferably 32 cores. 2 cpus x 8 =16 CPU core 4 cpus x 8 = 32 CPU cores.

Summary: To handle 1 billion requests, you need 5 physical machines, 11 (SAS) x 7TB, 64GB of ram (128GB is better), and 16 CPU cores (32 is better).

The picture

  • Network needs assessment -Copy the code

Assess what network card we need? Generally, it is either a gigabit network card (1G/s) or a ten-gigabit network card (10G/s).

At peak times 55,000 requests come in per second, 5.5/5 = approximately 10,000 requests per server. As we said before, 10,000 x 50 kilobytes is 488M of data per server, 488M of data per second. Data should also have copies, and synchronization between copies is also required by the network. 488 * 2 = 976m/s To clarify: the data of many companies is not as big as 50KB in a request. In our company, the host encapsulates data at the production end and merges multiple data together, so our request is as big as 50KB.

Explain: under normal circumstances, the bandwidth of the network card is not up to the limit, if it is a gigabit network card, we can generally use 700M or so. But in the best-case scenario, we’ll still use a 10-gigabit network card. If you’re using a gigabit, it’s a breeze.

The picture

  • Cluster Planning -Copy the code

The number of requests planning the number of physical machines analyzing the number of disks, choosing what kind of disk memory CPU core network card is to tell you, if the company has any needs in the future, the evaluation of resources, the evaluation of the server, we should evaluate according to my ideas:

The size of a message is 50kb -> 1kb 500 bytes 1Mip host name 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3.

Host planning: Kafka cluster architecture: master/slave architecture: Controller -> ZK cluster to manage the metadata of the entire cluster.

Zookeeper cluster Hadoop1 hadoop2 hadoop3; Theoretically, we should not install Kafka’s zK services together. But we have limited servers here. So our Kafka cluster is also installed on Hadoop1 haadoop2 hadoop3.

The picture

  • Kafka operation tools and commands -Copy the code

KafkaManager – Page management tool.

Scenario 1: The number of topics is too large.

When the topic was first created, the amount of data was small and the number of partitions given was small.

kafka-topics.sh –create –zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 –replication-factor 1 –partitions 1 –topic test6 kafka-topics.sh –alter –zookeeper hadoop1:2181,hadoop2:2181,ha

Broker ID: hadoop1:0 hadoop2:1 Hadoop3:2 Assume that a partition has three copies: Partition0: A, B,c A: leader partition B, C :follower partition

ISR:{a,b,c} If a follower partition does not pull data from the leader partition for more than 10 seconds, the partition is removed from the ISR list.

Scenario 2: Core Topic adds a replica factor

If the vim test.json script needs to be added to the core service data, save the following line of the JSON script:

{” version “: 1,” partitions “: [{” topic” : “test6”, “partition” : 0, “replicas” : [0]}, {” topic “:” test6 “, “partition” : 1, “replicas” : [0]}, {” topic “:” test6 “, “partition” : 2, “replicas” : [0]}]}

Execute the following json script:

kafka-reassign-partitions.sh –zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 –reassignment-json-file test.json –execute

Scenario 3: Topic with unbalanced load, manually migrating vi Topics to move-json

{” switchable viewer “: [{” topic” : “test01″}, {” topic “:” test02 “}], “version” : 1} / / write all your topic here kafka – reassgin – partitions. Sh – zookeeper hadoop1:2181, hadoop2:2181, hadoop3:2181 –topics to move-json-file topics to mot. json –broker-list “5,6” –generate

To distribute all partitions evenly across brokers, including all newly added brokers, a migration scheme is generated, which can be saved in a file: expand-cluster-reassignment.json

kafka-reassign-partitions.sh –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 –reassignment-json-file expand-cluster-reassignment.json –execute

kafka-reassign-partitions.sh –zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 –reassignment-json-file expand-cluster-reassignment.json –verify

This data migration operation must be done in the evening, when it is off-peak, because it migrates data between machines, which is very bandwidth intensive – generate: generates a migration plan from a given Topic list and Broker list. Generate does not actually migrate messages, but instead calculates a message migration plan for use by the EXECUTE command. – Execute: Migrates data based on the message migration plan. – verify: checks whether messages have been migrated.

Scenario 4: If there are too many Broker leader partitions

Normally, our Leader partition is load balanced between servers. Hadoop1 4 Hadoop2 1 hadoop3 1.

The number of partitions is automatically allocated and then dynamically adjusted. Kafka automatically distributes the Leader partition evenly across the machines. This ensures even read and write throughput on each machine, but there are exceptions.

That is, if some brokers go down, the leader partition will be concentrated on a small number of other brokers, which will result in high read and write request pressure on a small number of brokers, and all the other broken brokers will be folloer partitions after restart. Don’t read and write requests is very low, resulting in cluster load balance has a parameter, auto. Leader. Rebalance. Enable.

The default is true, every 300 seconds (leader. Imbalance. The check. The interval. Seconds) check whether leader load balance If a broker on the imbalance of leader more than 10%, Leader. Imbalance. Per. Broker. Percentage, will be an election for the broker configuration parameters: Auto. Leader. Rebalance. Enable the default is true leader. The imbalance. Per. Broker. The percentage: each broker allows the imbalance of the ratio of leader. If each broker exceeds this value, the controller triggers the leader’s balancing.

This value represents a percentage. 10% leader. Imbalance. Check. Interval. Seconds, the default value is 300 seconds.

The picture

  • Kafka producers send messagesCopy the code

The picture

How producers send messages – Basic case demonstration:

The picture

The picture

  • How to improve throughput -Copy the code

How to improve throughput: Parameter 1: buffer.memory: Set the buffer for sending messages. The default value is 33554432 (32MB). The default value is None, which does not compress. However, lZ4 compression can also be used, which is relatively efficient. After compression, the amount of data can be reduced and the throughput can be improved, but the CPU cost on the producer end will be increased. Set the batch size. If batch is too small, frequent network requests and throughput decrease.

If batch is too large, it will cause a message to wait for a long time to be sent, and will strain the memory buffer. Too much data is stored in the memory. The default value is: In the actual production environment, the value of this batch can be increased to improve throughput. If a batch is set too large, there will be delay.

It is generally set based on the size of a message. If we have less information. The default value is 0, meaning that the message must be sent immediately, but this is not correct, generally set a 100 milliseconds or so, in this way, the message is sent out after the batch, if 100 milliseconds, the batch is full of 16KB, It will be sent out naturally.

The picture

  • How to handle exceptions -Copy the code

LeaderNotAvailableException: if this is a machine hang up and a copy of the leader is not available at this time, will cause the failure of you write, to wait for other followers copy after switching to a copy of the leader, can continue to write, at this time can retry sending; If you restart the broker process of kafka at ordinary times, will surely lead to switch leader, will cause you to write an error, is LeaderNotAvailableException. NotControllerException: If the Controller’s Broker is down, there will be a problem and the Controller needs to be reelected. NetworkException: NetworkException timeout a. When the retries parameter is configured, it automatically retries B. But if it fails after a few tries, an Exception is provided. When we get the Exception, we process the message separately. We’ll have a backup link. Unsuccessful messages are sent to Redis or written to the file system, or even discarded.

The picture

  • Retry mechanism -Copy the code

Retry causes some problems:

Sometimes the leader switches over and needs to retry. However, message retries may result in the problem of repeated message retries. For example, network jitter causes the leader to try again when he thinks he failed. Message out of order Message retries can cause messages to be out of order because messages next to you may have been sent. So you can use the “Max. In. Flight. Requests. Per. Connection” parameter is set to 1, so we can ensure the producer at the same time can only send a message. The default retry interval is 100 ms and is set to “retry.backoff.ms”. 95% of exceptions can be handled by retries during development.

The picture

  • ACK parameter details -Copy the code

Acks =0; Request. Required. As long as the request has been sent, it is considered finished, and it does not care whether the write succeeded or not. Performance is good. If you are analyzing some logs and can withstand data loss, performance is good with this parameter. The request. Required. Acks = 1; Send a message to the leader Partition. The write is successful only when the write succeeds.

However, there is the possibility of losing data in this way. The request. Required. Acks = 1; The message cannot be successfully written until all copies in the ISR list have been written. ISR: One copy. 1 leader partition 1 follower partition Kafka server: min.insync.replicas: A leader partition will maintain an ISR list. This value limits the number of replicas in the ISR list. For example, if the value is 2, the ISR list will have only one replica.

An error was reported when inserting data into this partition. 1) acks = -1 3)min.insync.replicas >=2 3) Min.insync.replicas >=2

The picture

  • Custom partition -Copy the code

Partition: 1. Without a key, our messages will be sent to different partitions in rotation. 2, KeyKafka’s own partition partition will calculate a hash value based on the key, this hash value will correspond to a certain partition. If the keys are the same, the hash value must be the same, and the values of the keys must be sent to the same partition. But there are special cases where we need to customize partitions:

public class HotDataPartitioner implements Partitioner { private Random random; @Override public void configure(Map<String, ? > configs) { random = new Random(); } @Override public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String key = (String)keyObj; List partitionInfoList = cluster.availablePartitionsForTopic(topic); Int partitionCount = partitionInfolist.size (); Int hotDataPartition = partitionCount – 1; return ! The key. The contains (” hot_data “)? random.nextInt(partitionCount – 1) : hotDataPartition; }}

This class can be: how to use: configuration props. The put (partitioner. “class”, “com. ZHSS. HotDataPartitioner”);

The picture

  • Comprehensive case demonstration -Copy the code

Concept of consumer group: The same groupid belongs to the same consumer group.

(1) Each consumer should belong to a consumer.group, that is, a consumer group. A topic partition can only be assigned to one consumer under a consumer group for processing. It is also possible that a consumer is not assigned any partitions.

(2) If you want to achieve a broadcast effect, you only need to use different group IDS to consume. GroupA: consumer1: consume partition0 consuemR2: consume partition1 ConsuemR3: consume no data groupB: Consuemr3: Consume to partition0 and partition1 3) If a consumer in the Consumer Group hangs, the partition assigned to him will be automatically handed over to other consumers, and if he restarts, some of the partitions will be returned to him.

The picture

  • Kafka consumer group concept -Copy the code

The same groupid belongs to the same consumer group.

(1) Each consumer should belong to a consumer.group, that is, a consumer group. A topic partition can only be assigned to one consumer under a consumer group for processing. It is also possible that a consumer is not assigned any partitions.

(2) If you want to achieve a broadcast effect, you only need to use different group IDS to consume. GroupA: consumer1: consume partition0 consuemR2: consume partition1 ConsuemR3: consume no data groupB: Consuemr3: Consume to partition0 and partition1 3) If a consumer in the Consumer Group hangs, the partition assigned to him will be automatically handed over to other consumers, and if he restarts, some of the partitions will be returned to him.

Basic case demonstration:

The picture

The picture

  • Offset management -Copy the code

The data structure in each consumer memory stores the consumption offset for each partition of each topic, and the offset is periodically submitted. The old version was written to ZK, but such high concurrent requests to ZK are unreasonable architecture design. Zk is the coordination of distributed system, lightweight metadata storage, and cannot be responsible for high concurrent reads and writes. As a data store. Now the new version submits offset to kafka internal topic: The key is group. Id +topic+ partition number, and the value is the current offset value. Every once in a while, Kafka compacts this topic. That is, each group. Id +topic+ partition number keeps the latest data. __consumer_offsets may receive high concurrency requests, so the default partition is 50 (leader partitiron -> 50 kafka), so if your Kafka is deployed in a large cluster, such as 50 machines, We can use 50 machines to resist the pressure of submitting offset requests. Where does the consumer -> broker side’s data message -> disk -> offset order start to consume? -> offset Consumer (offset).

The picture

  • Offset monitoring tool -Copy the code

Add JMX_PORT=9988 to the first line of the bin/kafka-run-class.sh script to restart the Kafka process. Another piece of software: mainly monitors the offset of the consumer. Is a jar package Java – cp KafkaOffsetMonitor – assembly – 0.3.0 – the SNAPSHOT. Jar. Com quantifind. Kafka. Offsetapp. OffsetGetterWeb — offsetStorage kafka \ (according to version: – zk hadoOP1 :2181 – port 9004 – refresh 15. Seconds – Retain 2.days.

The picture

  • Abnormal perception of consumption -Copy the code

The heartbeat. Interval. Ms: The heartbeat interval between the consumer and the coordinator must be maintained to know whether the consumer is faulty. If the consumer is faulty, Make a heartbeat call to other consumers to rebalance. Session.timeout. ms rebalance: How long does it take kafka to detect a consumer before it considers it faulty? The default is 10 seconds Max. Poll.interval. If the time between two poll operations exceeds this value, the consume processing capacity will be considered to be too weak and will be kicked out of the consumption group and the partition will be allocated to others for consumption. Generally speaking, it can be set according to the performance of business processing.

The picture

  • Core parameter Description n/ACopy the code

Fetch. Max. Bytes: The maximum number of bytes to fetch a message. It is recommended to set this parameter to a larger size (1M by default).

The maximum value of a message sent by the Producer is -> 10M. The Broker stores data about the maximum size a message can accept -> 10M. Consumer max.poll.records: The maximum number of messages returned by a poll. The default is 500 connection.max.idle.ms: If the socket connection between the consumer and broker is idle for a certain period of time, the socket connection will be automatically reclaimed. However, the socket connection will be re-established for the next consumption. This recommendation is set to -1. Enable autocommit offset auto.mit.interval. ms: Default: 5000 ms _consumer_offset Auto-.offset. reset: When there are submitted offsets under each partition, the user consumes from the submitted offset. Submitted without offset, consumption topica from scratch – > partition0:1000 partitino1:2000 latest when have submitted under the partition of offset, since submit the offset of consumption; If there is no submitted offset, the newly generated data under the partition none Topic will be consumed after the submitted offset exists in all partitions. An exception is thrown whenever a partition does not have a committed offset.

The picture

  • Comprehensive case demonstration -Copy the code

Introduction case: The second-hand e-commerce platform (Joy Delivery) accumulates the stars of users according to their consumption amount. Order system (producer) -> Kafka cluster sends messages. Member system (Consumer) -> Consume messages in the Kafak cluster and process them.

Group coordinator principle

Interview question: How do consumers make Rebalance? – According to the coordinator:

Each consumer group chooses a broker as its coordinator. The coordinator monitors the heartbeat of each consumer in the group and makes rebalance decisions when it is down. A coordinator machine first hashes the groupId (number) and then modulates the number of partitions __consumer_offsets, which defaults to 50. _consumer_offsets partition number through offsets. Topic. Num. Partitions to set, found after partition, the partition of the broker machines is the coordinator. Such as: GroupId, “myconsumer_group” -> hash value (number) -> mod 50 -> 8 __consumer_offsets on which broker the 8th partition of this theme is on, The coordinator knows the partition to which all the consumers in the consumer group submit their offsets. Run the process (1) each consumer sends a JoinGroup request to a Coordinator, and (2) the Coordinator selects a consumer from one of the consumer groups as the leader. (3) Sends the information of the consumer group to the leader, (4) the leader then develops the consumption plan, (5) sends it to the Coordinator through the SyncGroup. 6) The Coordinator then sends the consumption plan to each consumer. They start socket connections and consume messages from the leader broker of the specified partition.

The picture

The picture

  • Rebalance strategy -Copy the code

The Consumer Group uses a Coordinator to implement Rebalance.

There are three rebalance strategies: range, round-robin and sticky.

Such as a theme of our consumption has 12 partitions: p0, p1, p2, p3, p4, p5, p6 and p7, p8, p9, p10, p11 suppose we consumers group there are three: P0 3 Consumer1 p4 7 Consumer2 p8 to 11 Consumer3 default to this policy. The round-robin strategy is to poll the allocation of consumer1:0,3,6,9 Consumer2:1,4,7,10 Consumer3:2,5,8,11 but there is a problem with the previous two schemes: 12 -> 2 each consumer consumes six partitions. Suppose consuemr1 fails: P1-5 is assigned to Consumer2 and P6-11 is assigned to Consumer3 so that the p6 and P7 partitions that were originally on Consumer2 are assigned to Consumer3. The latest sticky strategy is to make sure that the consumer’s partition still belongs to them at rebalance time, and rebalance it so that the consumer can maintain the same partition allocation as possible.

Consumer1:0-3 consumer2: 4-7 consumer3: 8-11 suppose consumer3 fails consumer1:0-3, +8,9 consumer2: 4-7, +10,11.

The picture

  • The Broker management -Copy the code

Leo, HW meaning: The core principles of Kafka how to evaluate a cluster resource build a set of Kafka cluster – introduces some simple operation and maintenance management operations. Producer (use, core parameters) consumer (principle, use, core parameters) Some principles within the broker

Core concepts: LEO, HW LEO: is related to offset.

LEO: In Kafka, both the leader and follower partitions are called replicas.

Each time the partition receives a message, it updates its LEO (log end offset), which is the latest offset + 1

HW: A very important function of the high-water LEO is to update the HW. If the follower and the leader’s LEO synchronize, the HW can update the data before the HW to be visible to the consumer, and the message is in the commit state. The news after HW is not consumed by consumers.

Leo update:

The picture

Hw update:

The picture

How does controller manage cluster 1: The /controller/ ID 2: controller service listens to the directory: /broker/ids/ used to sense broker up and down /broker/topics/ create topic, we create topic command, provided parameters, ZK address. /admin/ reassign_PARTITIONS are reassigned……

The picture

Let’s take a look at some of the places in Kafka where tasks need to be delayed. The first type of delayed tasks: for example, the producer’s acks=-1 must wait for both the leader and follower to finish writing before returning a response.

There is a timeout, which defaults to 30 seconds (request.timeout.ms). So need to be written to a disk data to the leader, you must have a delay task, due to time is 30 seconds delay task On DelayedOperationPurgatory (delay manager).

If before 30 seconds if all the followers are copy write to the local disk, then this task will be automatically trigger the awakening, can return to response the result to the client, otherwise, this delay task myself specifies the most expires is 30 seconds, if the timeout time don’t wait, to directly timeout abnormal return.

The second type of delayed tasks: When the delay of the pull task reaches 100ms, an empty data is returned to the follower. The follower then sends another request to read the message. However, if the leader writes a message during a delay (less than 100ms), the task will wake up and automatically perform the pull task.

Massive delayed tasks need to be scheduled.

The picture

  • Time wheel mechanism -Copy the code

What would have to design a time wheel? Kafka uses a time wheel (O(1)) to create a time-delay task (O(1)). Kafka uses a time wheel (O(1)) to create a time-delay task (O(1)). What is a time wheel? In fact, the time wheel is simply an array. TickMs: 1ms wheelSize: time wheelSize 20 interval: timckMS * whellSize, the total time span of a time wheel. 20ms currentTime: indicates the currentTime. A: Because the time wheel is an array, the time complexity is O(1) to obtain the data in the array by index. B: The time complexity is O(1) to insert and delete tasks in a bidirectional linked list. Example: Insert a task to be performed after 8ms 19ms 3. Multi-tiered time wheels for example: insert a task that will run in 110 milliseconds. TickMs: 20ms wheelSize: time wheelSize 20 interval: timckMS * whellSize, the total span of a time wheel. 20ms currentTime: indicates the currentTime. Layer 1 Time round: 1ms * 20 Layer 2 time round: 20ms * 20 Layer 3 time round: 400ms * 20

The picture

Reference: blog.csdn.net/eraining/ar…