Hi, I’m Yunqi.

Some people say there are three great inventions: fire, the wheel, and Kafka.

To date, Apache Kafka has been very successful. Confluent said that one-third of the Fortune 500 companies use Kafka. Today I would like to share with you about Kafka related knowledge, high performance, persistence, multiple copy backup, horizontal extension……

Ten thousand words long, ready, suggest to collect again to see!

Why is there a message system

  1. decoupling

  2. Asynchronous processing such as e-commerce platforms, seckill activities. The general process is divided into: 1: risk control, 2: inventory lock, 3: order generation, 4: SMS notification, 5: data update

  3. Split the seckill activity business through the message system, and put the business which is not urgent to deal with in the back to deal with slowly; The process changed to: 1: risk control, 2: inventory lock, 3: message system, 4: order generation, 5: SMS notification, 6: update data

  4. Traffic control: 1. After the gateway receives the request, it puts the request into the message queue 2. The back-end service gets the request from the message queue and completes the subsequent seckill process. And then return the results to the user. Advantages: Traffic control Disadvantages: slow down the process

Kafka core concepts

Producer: Producer generates data in the Kafka cluster

Consumers: Consumers pull data from Kafka, process data, and consume data

Topic: the topic

By default, a topic has one partition. You can set multiple partitions (partitions are stored on different nodes of the server).

Kafka’s cluster architecture

In a Kafka cluster, a Kafka server is a broker, a Topic is a logical concept, and a partition is represented as a directory on disk.

When you consume data, you must specify A Group ID. If you specify A Group ID, assume that program A and program B have the same Group ID, then both programs belong to the same Consumer Group.

Special: for example, if A topic topicA program A consumes topicA, program B can no longer consume topicA (program A and Program B belong to the same consumer group); For another example, program A has consumed the data in topicA, and now it is not allowed to consume topicA’s data again, but after A new group ID number is specified, it can be consumed. There is no impact between different consumer groups, consumer groups need to be customized, consumer name program automatically generated (unique).

Controller: a master node in a Kafka node that uses ZooKeeper.

Kafka disk sequential write ensures data write performance

** Write data in kafka: ** Sequential data is appended to disks, without random data write operations. Rule of thumb: If a server has a certain number of disks and a certain number of revolutions, data can be sequentially written (ap-written) to the disk at a rate similar to that of memory. The producer writes the message to the OS cache through kafka and then writes it to the disk in sync order.

5, Kafka zero copy mechanism to ensure high performance read data

Consumer data reading process:

  1. The consumer sends a request to the Kafka service

  2. The kafka service reads data from the OS cache.

  3. Data is read from the disk to the OS cache

  4. The OS cache copies data to the Kafka application

  5. Kafka sends the data (replication) to the socket cache

  6. The socket cache is transmitted to the consumer through the NETWORK adapter

Kafka Linux SendFile technology – Zero copy

1. The consumer sends a request to the Kafka service. 2. The Kafka service reads data from the OS cache. Data is read from the disk to the OS cache. 4. The OS cache directly sends data to NIC 5. Data is transmitted to consumers through network cards

6. Save Kafka logs in segments

A topic in Kafka is usually partitioned; For example, create topic_A and specify that the topic has three partitions. There are actually three directories created on three servers. Create directory topic_A-0: on server 1 (Kafka1). Below the directory are our files (storing data), kafka data is message, data is stored in the log file. The.log end is the log file. In Kafka, data files are called log files. By default, a partition has n log files (segmented storage), and a log file is 1G by default.

Server 2 (Kafka2) : creates a directory topIC_A-1: creates a directory for server 3 (Kafka3) : creates a directory topIC_A-2:

7, Kafka binary search location data

Every message in Kafka has its own offset, which is stored on a physical disk. In position, a message has two positions: Disk physical location Sparse index: Kafka uses a sparse index approach to read the index. Every time Kafka writes a 4k log (.log), it writes a record index to the index. Binary search is used.

8. High Concurrency Network design (first understand NIO)

The network design part is the best part of kafka design, which is to ensure that Kafka high concurrency, high performance, kafka tuning, you have to understand the principles of Kafka, especially the network design part

Reactor network design mode 1:

Reactor network design mode 2:

Reactor network design mode 3:

Design of Ultra-high Concurrency Networks in Kafka:

9. Kafka redundancy ensures high availability

Partitions are replicated in Kafka. Note: There was no replica mechanism before 0.8. When you create a topic, you can specify either the partition or the number of replicas. A copy has the following roles: Leader Partition: 1. Write and read data are performed from the leader partition. 2. An in-Sync-Replica (ISR) list is maintained, but the values in the ISR list are deleted according to certain rules. The producer sends a message to the leader partition. The follower partition is submitted to the follower partition. Data is synchronized from the leader partition.

Good architecture thinking – summary

High concurrency: Multi-copy mechanism High concurrency: Network architecture design Three-tier Architecture: Multi-selector -> Multi-thread -> Queue Design (NIO) High performance: Write data:

  1. Write data to the OS Cache first

  2. Write to disk sequentially, high performance

Read data:

  1. Based on the sparse index, quickly locate the data to consume

  2. The zero-copy mechanism reduces copying of data and reduces context switching between the application and the operating system

11. Build Kafka production environment

11.1 Requirement Scenario Analysis

E-commerce platforms, a billion requests are sent to Kafka clusters every day. 28 anyway, the general assessment of the problem is not big. 1 billion requests -> 24 over, under normal circumstances, every day 12:00 to 8:00 in the morning this period of time is actually not much data volume. 80% of the requests are processed in another 16 hours. 16 hours to process -> 800 million requests. 16 * 0.2 = 80% of the 800 million requests processed in 3 hours

That’s 600 million data in three hours. Let’s just do a simple calculation of the peak QPS 600 million over 3 hours = 55,000 per second

1 billion requests * 50kb = 46 terabytes of data are stored every day

Generally, we set two replicas 46T * 2 = 92T. The data in Kafka is reserved for the last 3 days. 92T * 3 days = 276T I’m talking about 50KB here and I’m not saying that a message is 50KB or not (by merging logs, multiple logs are merged together), usually a message is a few b’s, it can be a few hundred bytes.

11.2 Evaluating the Number of Dedicated Servers

1) First analyze whether virtual machines or physical machines are needed for clusters like Kafka, mysql and Hadoop. We use physical machines in production. 2) The peak needs to process the total request per second 55,000 requests, in fact, one or two physical machine is absolutely able to resist. In general, when we evaluate machines, we evaluate them at four times the peak. If we were to quadruple that, we would probably be up to 200,000 QPS in our cluster. This kind of cluster is a more secure cluster. About five physical machines are needed. Each machine can handle 40,000 requests.

Scenario summary: It takes 5 physical machines to handle 1 billion requests, 55,000 QPS in peak period, and 276T data.

11.3 Disk Selection

It takes 5 physical machines to handle 1 billion requests, 55,000 QPS in peak period and 276T data. 1) SSD solid state disk, still need ordinary mechanical hard disk **SSD: performance is better, but the price is expensive SAS disk: performance is not very good in some aspects, but relatively cheap. The performance of SSDS is good, which means that the PERFORMANCE of random read and write is good. Suitable for clusters like MySQL. ** But its sequential write performance is similar to SAS disks. What Kafka means: write in order. So we just use a normal [mechanical hard drive].

2) We need to evaluate how many disks are needed for each server. 5 servers, a total of 276T is needed, and each server needs to store about 60T of data. The server configuration in our company uses 11 hard disks, each of which is 7T. 11 * 7T = 77T

77T x 5 servers = 385T.

Scene summary:

To handle 1 billion requests, it takes 5 physical machines, 11 SAS * 7T

11.4 Memory Evaluation

To handle 1 billion requests, it takes 5 physical machines, 11 SAS * 7T

We found that Kafka read and write data flow is based on OS cache, in other words, assuming that our OS Cashe infinite then the entire Kafka is equivalent to is based on memory to operate, if it is based on memory to operate, performance must be very good. Memory is finite. 1) Give as much memory as possible to OS cache 2) Kafka code is written in Scala with core code and client code in Java. Both are based on the JVM. So we have to give some memory to the JVM. Kafka is not designed to put many data structures in the JVM. So we don’t need much memory for this JVM. As a rule of thumb, 10 grams is fine.

NameNode: The JVM also puts metadata (tens of gigabytes), the JVM must give large. Let’s say I give you 100 grams.

Let’s say we have 10 requests for this project, there will be 100 topics in total. 100 topic * 5 partition * 2 = 1000 Partition a partition is a directory on a physical machine that contains multiple.log files. Log files are used to store data. By default, a.log file is 1 GB in size. If we want to keep 1000 partitions of the latest. Log file data in memory, this is the best performance. 1000 x 1 GB = 1000 GB memory. We just need to keep the current log up to date and make sure that 25% of the current data is in memory. 250M x 1000 = 0.25G x 1000 =250G memory.

250 Memory / 5 = 50 GB Memory 50 GB +10 GB = 60 GB

64 GIGABytes of memory, and another 4 gigabytes. Does the operating system need memory? In fact, The JVM in Kafka can also not give so much as 10GB. It’s estimated that 64G is ok. Of course, if you can give 128G memory server, that is the best.

When I evaluated just now, I used 5 partitions for a topic, but there may be 10 partitions for a topic with a large amount of data.

Summary: To handle 1 billion requests, requires 5 physical machines, 11 (SAS) * 7T, requires 64GB of memory (128GB is better)

11.5 CPU Pressure Assessment

Assess how many CPU cores are needed per server (resources are limited)

We estimate how many CPUS are needed based on how many threads are running in our service. Threads rely on the CPU to run. If we have more threads but less CPU core, then our machine will be overloaded and performance will be poor.

Assess how many threads a kafka server will have after it starts.

Acceptor thread 1 Processor thread 3 6 to 9 threads processing request thread 8 32 threads periodic cleanup thread, pull data thread, periodic checking ISR list mechanism, etc. So when a Kafka service is started, there are probably over a hundred threads.

CPU core = 4, at a time to say, dozens of threads, must be full of CPU. CPU core = 8, should easily support dozens of threads. If we have more than 100 threads, or something like 200 threads, then eight CPU cores is not going to work. So here we recommend: 16 CPU cores. If possible, 32 CPU cores would be great.

Bottom line: A Minimum of 16 CPU cores should be used in a Kafka cluster, and 32 CPU cores would be better. 2cpu * 8 =16 cpu core 4cpu * 8 = 32 cpu core

Summary: To handle 1 billion requests, requires 5 physical machines, 11 (SAS) * 7T, requires 64GB of memory (128GB better), requires 16 CPU cores (32 better)

11.6 Network Requirement Assessment

What kind of network card do we need? Generally either gigabit network cards (1G/s), there are ten gigabit network cards (10G/s)

At peak times there are 55,000 requests per second, which is 5.5/5 = about 10,000 requests per server. As we said before, 10,000 times 50 kilobytes is 488M, so each server is receiving 488M of data per second. There are also replicas of data, and synchronization between replicas is also a request to the network. 488 * 2 = 976M /s To explain: many companies' data, a request inside is not so large as 50KB, our company is because the host in the production end of the data encapsulation and data combined together, so our request will be so large. Note: under normal circumstances, the bandwidth of the network card is not up to the limit, if it is gigabit network card, we can use is generally about 700M. But in the best case scenario, we'll go with a 10 gigabit network card. If you're using a 10-gigabit, it's easy.Copy the code

11.7 Cluster Planning

Request quantity planning the number of physical machine analysis of the number of disks, choose to use what kind of disk memory CPU core network card is to tell you, in the future if there is any demand in the company, the evaluation of resources, the evaluation of the server, everyone in accordance with my ideas to evaluate

Size of a message 50kb -> 1kb 500byte 1Mip host name 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3

Host planning: Kafka cluster architecture: master-slave architecture: controller -> ZK cluster to manage the metadata of the entire cluster.

  1. Zookeeper cluster haDOop1 HaDOOP2 Hadoop3

  2. In theory, we should not install Kafka services for ZK together. But we have limited servers here. So our Kafka cluster is also installed on hadoop1, haadoop2 and hadoop3

12. Kafka operation and maintenance

12.1 Common O&M Tools

KafkaManager – Page management tool

12.2 Common O&M Commands

Scene 1:The amount of topic data is too large. Increase the number of topics

In the beginning, when creating the theme, the amount of data is small and the number of partitions given is small.

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6
kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,ha

Copy the code

The broker id:

Hadoop1:0 haDOop2:1 haDOOP3:2 Assume that a partition has three copies: partition0: a, B, and C

A: leader partition B, C :follower partition

ISR:{a,b,c} If a follower partition does not pull data from the leader partition for more than 10 seconds, the partition is removed from the ISR list.

Scene 2:Core topics add replica factor

If you need to add the copy factor vim test.json script for core business data, save the following line of JSON script

{" version ": 1," partitions ": [{" topic" : "test6", "partition" : 0, "replicas" : [0]}, {" topic ":" test6 ", "partition" : 1, "replicas" : [0]}, {" topic ":" test6 ", "partition" : 2, "replicas" : [0]}]}Copy the code

Execute the above JSON script:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

Copy the code

Scenario 3:For topics with unbalanced load, manual migration is requiredvi topics-to-move.json

{" switchable viewer ": [{" topic" : "test01"}, {" topic ":" test02 "}], "version" : 1} / / write all your topic here kafka - reassgin - partitions. Sh - zookeeper hadoop1:2181, hadoop2:2181, hadoop3:2181 --topics-to-move-json-file topics-to-move-json --broker-list "5,6" --generateCopy the code

Placing all of your machines, including newly added brokers, here means that all partitions are evenly distributed among brokers, including newly added brokers. This generates a migration scheme that can be stored in a single file: expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

Copy the code

This data migration must be done at night during low peak hours, as it migrates data between machines and consumes bandwidth resources – generate: Generate a migration plan based on the given list of topics and brokers. Generate does not actually perform the message migration, but rather calculates the message migration plan for use by the EXECUTE command. – Execute: the migration is performed based on the given message migration plan. – verify: checks whether the migration is complete.

Scene 4:If there are too many broker leader partitions

Normally, our leader partition is load balanced between servers. hadoop1 4 hadoop2 1 hadoop3 1

The number of partitions is automatically allocated and dynamically adjusted. Kafka automatically distributes the leader partition on each machine. This ensures that the read and write throughput of each machine is uniform, but there are exceptions. If some brokers are down, the leader partition is concentrated on a small number of other brokers. This will result in a large number of read and write requests on the few brokers. Don’t read and write requests is very low, resulting in cluster load balance has a parameter, auto. Leader. Rebalance. Enable, the default is true, Every 300 seconds (leader. Imbalance. Check the interval. The seconds) check whether leader load balance If a broker on the imbalance of leader more than 10%, Leader. Imbalance. Per. Broker. Percentage, will be an election for the broker configuration parameters: Auto. Leader. Rebalance. Enable the default is true leader. The imbalance. Per. Broker. The percentage: each broker allows the imbalance of the ratio of leader. If each broker exceeds this value, the controller triggers the leader to balance. This value represents a percentage. 10% leader. Imbalance. Check. Interval. Seconds, the default value is 300 seconds

Kafka producer

13.1 Principles of The Producer Sending Messages

13.2 Principles of The Producer Sending Messages – Basic Example

13.3 How can I Improve throughput

Memory: sets the buffer in which messages are sent. The default value is 33554432 (32MB). The default value is None, but LZ4 can also be used to compress data. The efficiency is good. After compression, the amount of data can be reduced and the throughput can be improved, but the CPU cost of producer will increase. Set the batch size. If the batch size is too small, frequent network requests will occur and the throughput decreases. If batch is too large, it takes a long time for a message to be sent, and the memory buffer is overloaded. The default value is: 16384 is 16KB, that is, a batch is sent when it is full of 16KB. Generally, in a production environment, this batch value can be increased to improve throughput. If a batch is set to large, there will be a delay. It is generally set according to the size of a message. If we have less information. Linger. Ms, the default value is 0, which means the message must be sent immediately, but this is not correct, it is usually set to something like 100 milliseconds, so that the message is sent into a batch, and if the batch is full of 16 kilobytes within 100 milliseconds, Naturally it will be sent.

13.4 How Can I Handle Exceptions

  1. LeaderNotAvailableException: if this is a machine hang up and a copy of the leader is not available at this time, will cause the failure of you write, to wait for other followers copy after switching to a copy of the leader, can continue to write, at this time can retry sending; If you restart the broker process of kafka at ordinary times, will surely lead to switch leader, will cause you to write an error, is LeaderNotAvailableException.

  2. NotControllerException: This is the same. If the Controller’s Broker is dead, there will be a problem waiting for the Controller to re-elect.

  3. NetworkException: The network is abnormal. Timeout a. Configure the retries parameter and it will retry automatically. If it doesn’t work after a few tries, it will provide an Exception to process. When we get the Exception, we process the message separately. We’ll have a backup link. Sending unsuccessful messages is either sent to Redis or written to the file system, or even discarded.

13.5 Retry Mechanism

Retries cause some problems:

  1. Sometimes there is a problem with the leader switching and you need to retry. You can set retries. However, the message retry will cause the problem of repeated sending.

  2. Message out of order Message retries can cause messages to be out of order because the messages after you may have been sent. So you can use the “Max. In. Flight. Requests. Per. Connection” parameter is set to 1, so we can ensure the producer at the same time can only send a message. The default interval between retries is 100 milliseconds, which is set to “retry.backoff.ms”, and 95% of exceptions will be fixed by the retry mechanism during development.

13.6 Detailed ACK Parameters

Request.required. acks=0 set by the producer; As long as the request has been sent, it is sent. It does not care whether the request was written or not. Good performance, if you are analyzing some logs and can afford to lose data, you can use this parameter, the performance is very good. The request. Required. Acks = 1; After a message is sent, the write to the leader partition is successful. However, this method also has the possibility of losing data. The request. Required. Acks = 1; The message is not written until all copies in the ISR list have been written. ISR: One copy. 1 Leader partition 1 follower partition Kafka server: Min.insync. replicas: A leader partition maintains an ISR list. This value limits the number of copies of the ISR list. For example, if the value is 2, there is only one copy of the ISR list. An error occurs while trying to insert data into this partition. 1) partition replica >=2 2)acks = -1 3)min.insync.replicas >=2 3)min.insync.replicas >=2

13.7 Customizing partitions

Partitions: 1. If we do not set the key, our messages will be sent to different partitions in rotation. Set keyKafka’s built-in partition to calculate a hash value based on the key. The hash value is applied to a partition. If the keys are the same, then the hash value must be the same, and the value of the key must be sent to the same partition. However, in some special cases, we need to customize the partition

public class HotDataPartitioner implements Partitioner { private Random random; @Override public void configure(Map<String, ? > configs) { random = new Random(); } @Override public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String key = (String)keyObj; List partitionInfoList = cluster.availablePartitionsForTopic(topic); Int partitionCount = partitionInfolist.size (); Int hotDataPartition = partitionCount - 1; return ! The key. The contains (" hot_data ")? random.nextInt(partitionCount - 1) : hotDataPartition; }}Copy the code

This class can be: how to use: configuration props. The put (partitioner. “class”, “com. ZHSS. HotDataPartitioner”);

13.8 Comprehensive case presentation

Each consumer should belong to a consumer. Group. A topic partition is assigned to only one consumer in a consumer group. Each consumer may be assigned multiple partitions, or a consumer may not be assigned any partitions. 2) To achieve the effect of a broadcast, all you need to do is consume with a different group ID. GroupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: 3) If a consumer in the Consumer group dies, the partition assigned to him will be automatically handed over to other consumers. If he restarts, some partitions will be handed back to him

14. Kafka consumers

14.1 Concept of consumer Groups

1) Each consumer should belong to a consumer. Group, which means a consumer group. A partition of a topic is allocated to only one consumer in a consumer group. It is also possible that a consumer is not assigned to any partition. 2) If you want to achieve the effect of a broadcast, you simply need to consume with a different group ID. GroupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: groupB: 3) If a consumer in the Consumer group dies, the partition assigned to him will be automatically handed over to other consumers. If he restarts, some partitions will be handed back to him

14.2 Basic Case Demonstration

14.3 Offset Management

  1. The data structure in each consumer memory holds the consumption offset for each partition of each topic, and the offset is periodically submitted. The old version was written to zK, but that high concurrent request ZK is not reasonable architecture design, zK is to coordinate distributed systems, lightweight metadata storage, cannot be responsible for high concurrent read and write. As a data store.

  2. Now the new version commits the offset to the internal Kafka topic: __consumer_offsets, when the past is submitted, the key is the group. Id +topic+ partition number, and the value is the current offset value. Every once in a while, kafka internally compuses this topic. That is, each group. Id +topic+ partition number keeps the latest data.

  3. __consumer_offsets may receive high concurrency requests, so the default partition is 50 (leader partitiron -> 50 kafka), so if your Kafka has a large cluster, say 50 machines, You can use 50 machines to counter the pressure of offset submissions. Consumer -> broker data Message -> Disk -> offset Sequence -> offset (offset)

14.4 Offset Monitoring Tool Description

  1. In the bin/kafka-run-class.sh script, add JMX_PORT=9988 in the first line to restart the Kafka process

  2. Another piece of software: consumer offsets that are primarily monitored. Is a jar package Java – cp KafkaOffsetMonitor – assembly – 0.3.0 – the SNAPSHOT. Jar. Com quantifind. Kafka. Offsetapp. OffsetGetterWeb — offsetStorage kafka \ (According to version: If kafka exists, enter Kafka. If ZooKeeper exists, enter ZooKeeper.) – Zk Hadoop1:2181 – Port 9004 – refresh 15.seconds – retain 2.

14.5 Abnormal perception of consumption

The heartbeat. Interval. Ms: The consumer has to be in the heartbeat with the coordinator to know if the consumer is down. And then if the consumer is down, It sends the rebalance command through its heartbeat to the other consumers to make the rebalance. Session.timeout.ms: The default is 10 seconds Max. Poll.interval. Ms: If the time between two poll operations is longer than this, the consume is considered too weak and is kicked out of the consumption group. The partition is allocated to someone else to consume. Generally, it is set according to the performance of the business processing.

14.6 Core Parameter Description

Fetch. Max. Bytes: The maximum number of bytes that can be fetched from a message. It is recommended to set this parameter to a larger size.

  1. The maximum size of a message sent by a Producer: -> 10M

  2. The Broker stores data. The maximum size a message can accept -> 10M

  3. Consumer max.poll.records: The maximum number of messages to be returned in one poll. The default is 500. Connection.max.idle.ms: If the socket connection between the consumer and the broker is idle for longer than a certain period of time, the connection will be automatically reconnected, but the socket will have to be reconnected next time. The recommendation is to set this to -1. Do not reconnected enable.auto.com MIT: Auto.mit.interval.ms: _consumer_offset auto.offset. Reset: earliest: When there are submitted offsets for each partition, consumption starts at the submitted offset. Topica -> partition0:1000 partitino1:2000 Latest If an offset has been committed for each partition, the consumption starts from the submitted offset. If no offset is committed, the newly generated data in the partition will be consumed. None Topic Consumption starts after the offset. As long as one partition does not have a committed offset, an exception is thrown

14.7 Comprehensive case demonstration

Introduction case: Second-hand e-commerce platform (Happy Send), according to the amount of consumption of users, users star accumulative. Order system (producer) -> send message in Kafka cluster. The member system (consumer) -> consumes messages in the Kafak cluster and processes them.

14.8 Working Principles of group Coordinator

Question: How do consumers make the rebalance? – Based on coordinator

  1. Each consumer group selects a broker as its coordinator. This broker monitors the heartbeat of all consumers in the consumer group and makes a rebalance to determine if there is a downtime

  2. The machine first hashes (numerically) groupId, then modulo __consumer_offsets, which defaults to 50, _consumer_offsets partition number through offsets. Topic. Num. Partitions to set, found after partition, the partition of the broker machines is the coordinator. Such as: GroupId, “myconsumer_group” -> hash value (number) -> mod 50 -> 8 The Coordinator knows where all the consumers in the consumer group are submitting their offsets to.

  3. 1) Each consumer sends a JoinGroup request to the Coordinator, and 2) the Coordinator selects one consumer from the Consumer group as the leader. 3) The consumer group information is sent to the leader. 4) The leader is then responsible for formulating the consumption scheme. 5) The consumption scheme is sent to the Coordinator through SyncGroup. They start socket connections and consume messages from the leader broker of the specified partition

14.9 rebalance strategy

A consumer group uses a coordinator to implement the Rebalance

There are three strategies for rebalance: range, round-robin, and sticky

Such as a theme of our consumption has 12 partitions: p0, p1, p2, p3, p4, p5, p6 and p7, p8, p9, p10, p11 suppose we consumers group there were three

  1. Range Policy The range policy is based on the sequence number of the partiton p0 3 consumer1 p4 7 Consumer2 p8~11 Consumer3 by default.

  2. The round-robin policy is to poll the assignments to consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11 but there is a problem with the first two solutions: 12 -> 2 each consumer consumes six partitions

Let’s say consuemR1 is dead: P0-5 is assigned to Consumer2,p6-11 to Consumer3 so that the p6 and P7 partitions that were originally on Consumer2 are assigned to Consumer3.

  1. This is a tactic that makes sure that the consumer’s partitions are all theirs when the rebalance is all right. This is a tactic that makes sure that the consumer’s partitions are all theirs after the rebalance

Consumer1:0-3 consumer2: 4-7 consumer3: 8-11 let’s say that the consumer3 has gone to consumer1:0-3, +8,9 consumer2: 4-7, +10,11

15. Broker management

15.1 Leo and HW meanings

  1. The core principle of Kafka

  2. How to evaluate a cluster resource

  3. Set up a Kafka cluster – “introduced some simple operation and maintenance management operations.

  4. Producer (use, core parameters)

  5. Consumer (principles, usage, core metrics)

  6. Some of the principles inside the broker

Core concepts: LEO, HW LEO: Is related to the offset.

LEO: In Kafka, both the leader partition and the follower partition are called replicas.

Every time a partition receives a message, it updates its LEO, which is the log end offset. LEO is actually the latest offset + 1

HW: An important function of the high-water LEO is to update the HW. If the follower and the leader LEO are synchronized, the HW can update the data before the HW, and the message is in the COMMIT state. After HW the message consumers can’t consume.

15.2 Leo update

Hw update 15.3

15.4 How Can a Controller Manage a Cluster

1: /controller/id 2: Controller service listens to directory: /broker/ IDS /to sense the broker going online /broker/topics/ /admin/reassign_partitions

15.5 Delayed Tasks

Let’s first take a look at where kafka needs tasks to be deferred. The first type of delayed tasks: for example, if the acks of a producer are -1, the response cannot be returned until both the leader and follower have finished writing. There is a timeout period, which defaults to 30 seconds (request.timeout.ms). So need to be written to a disk data to the leader, you must have a delay task, due to time is 30 seconds delay task On DelayedOperationPurgatory (delay manager). If before 30 seconds if all the followers are copy write to the local disk, then this task will be automatically trigger the awakening, can return to response the result to the client, otherwise, this delay task myself specifies the most expires is 30 seconds, if the timeout time don’t wait, to directly timeout abnormal return. The second type of delayed tasks: When a follower attempts to fetch a message from the leader, if the message is found to be empty, a delay is created. When the delay time reaches (for example, 100ms), the follower returns empty data, and then sends a request to read the message again. However, if the leader writes a message during the delay (less than 100ms), the task wakes up and automatically performs the pull task.

Huge amount of delayed tasks to schedule.

15.6 Time wheel mechanism

  1. What will have to design the time wheel? Kafka internal there are many delayed tasks, not based on the JDK Timer to achieve, the time complexity of the insert and delete task is O(nlogn), but based on their own written time round to achieve, the time complexity is O(1), rely on the time round mechanism, delayed task insert and delete, O(1)

  2. What is the time wheel? The time round is essentially an array. TickMs: 1ms wheelSize: time wheelSize 20 interval: timckMS * whellSize, the total time span of a time wheel. 20ms currentTime: pointer to the currentTime. A: The time wheel is an array, so the time complexity of obtaining the data in the array is O(1). B: The time complexity of inserting and deleting tasks in the list is O(1). Insert a task to be executed after 8ms 19ms 3. For example, insert a task that will run 110 milliseconds later. TickMs: 20ms wheelSize: time wheelSize 20 interval: timckMS * whellSize, the total time span of a time wheel. 20ms currentTime: pointer to the currentTime. Time round of Layer 1:1ms * 20 Time round of Layer 2:20ms * 20 Time round of Layer 3:400ms * 20

Respect ~

Recommended Reading:

Fully! Common modeling methods and example demonstration in data warehouse field

Interview with senior engineer of data Warehouse

About building and optimizing data warehouse architecture and model design

Comprehensive interpretation of data center, data warehouse and data lake

The architect soul 10 q | data warehouse construction

I am “cloud qi”, a love technology, poetry can write big data development ape, welcome everyone to pay attention!

Life, the sea, the waves ahead.

This article uses the Article Synchronization Assistant to synchronize