preface

· · · · · ·

1. Kafka’s Producer story

Suppose we now have an e-commerce system, all users who can log in the system are members, and the value of members is reflected in how much money they consume, they will accumulate corresponding points. Points can be redeemed for gift bags, coupons, etc.

It’s time for our drawing again 👌. First of all, we need to create an order system, and the order system must have data logs. It now writes these logs to Kafka, and logs are recorded using JSON. The statement in the figure represents the order status, which is paid.

At this point, the member system must be our consumer, and it will accumulate points for the member with id 1. Of course, we must take into account that the member may also carry out the refund operation, and the corresponding points will be reduced. Statement is now cancelled as cancel

In setting parameters, we mentioned that we can set a key for each message, or we can not specify that the key depends on which topic and which partition we want to send the message to. For example, we now have a theme called tellYourDream with two partitions under the theme and two copies of each partition (at this point we don’t care about the follower because its data is synchronized with the leader’s).

Topic: tellYourDream P0: Leader partition < -follower partition P1: Leader partition < -follower partitionCopy the code

If no key is specified, a message is sent to the partition in polling mode. For example, if MY first message is one, that one is sent to p0, the second message is sent to two, that one is sent to P1, then three is P0, then four is P1, and so on.

If you specify a key, for example, if my key is message1, Kafka will hash the key, modulo the number of partitions, and determine which partition to use. If the modulo is 0, Kafka will send to p0. Modulo 1 is sent to P1, which ensures that messages with the same key will always be sent to the same partition (you can also use this feature to specify that certain messages will always be sent to the specified partition). This is similar to MapReduce shuffle again, so these big data frameworks are really interconnected.

For the membership system we just mentioned, if the message of placing an order is sent to P0, while the message of refund is sent to P1, it is inevitable that sometimes consumers will consume the message in P1 first. At this time, 1000 points have been deducted before the user’s points have been increased, and there will be problems in display. Therefore, to ensure that messages from the same user are sent to the same partition, we need to specify the key.

Code section

CTRL + C, CTRL + V because we have explained all the configuration of prop.put below in Kafka producer principle and important parameter description, this time we will go directly to CTRL + C, CTRL + V. The idea is to extract the original producer code into a createProducer() method.

public class OrderProducer { public static KafkaProducer<String, String> createProducer() { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("buffer.memory", 33554432); props.put("compression.type", "lz4"); props.put("batch.size", 32768); props.put("linger.ms", 100); props.put("retries", 10); //5 10 props.put("retry.backoff.ms", 300); props.put("request.required.acks", "1"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); return producer; }Copy the code

Here is a piece of code that produces a MESSAGE in JSON format, also extracted as a method.

public static JSONObject createRecord() { JSONObject order=new JSONObject(); order.put("userId", 12344); Order. The put (" amount ", 100.0); order.put("statement", "pay"); return order; }Copy the code

In this case, the producer and the message are created directly. In this case, the key can be either the userId or the order ID.

public static void main(String[] args) throws Exception { KafkaProducer<String, String> producer = createProducer(); JSONObject order=createRecord(); ProducerRecord<String, String> record = new ProducerRecord<>( "tellYourDream",order.getString("userId") ,order.toString()); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception Exception) {if(Exception == null) {system.out.println (" message sent successfully "); } else {// process}}}); Thread.sleep(10000); producer.close(); }}Copy the code

At this time, if the message is still abnormal after the retry mechanism is implemented, the company’s more rigorous projects will have standby links, such as storing data in MySQL, Redis, etc., to ensure that the message will not be lost.

Add: Custom partitions (at your own discretion)

Since Kafka itself provides a mechanism that is more or less adequate for use in production environments, I won’t go into detail on this. In addition, there are custom serialization, custom interceptor, these are not used frequently in the work, if used, probably can be baidu learning.

For example, call records for customer service calls are stored in one partition, and other records are evenly distributed in the remaining partitions. Partition (); Partition (); Partition (); Partition ();

package com.bonc.rdpe.kafka110.partitioner; import java.util.List; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; /** * @date 2018-06-25 14:58:14 */ public class PhonenumPartitioner implements Partitioner{ @Override public void configure(Map<String, ? > configs) { // TODO nothing } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster Cluster) {/ / get the topic partitions information List < PartitionInfo > partitions. = Cluster partitionsForTopic (topic); int numPartitions = partitions.size(); / / simulation if a customer service (key. The toString (). The equals (" 10000 ") | | key. The toString (). The equals (" 11111 ")) {/ / into the last partition return numPartitions - 1; } String phoneNum = key.toString(); return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1); } @Override public void close() { // TODO nothing } }Copy the code

Use custom partitions

package com.bonc.rdpe.kafka110.producer; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * @title partitionerProducer.java * @description test custom partition * @date 2018-06-25 15:10:04 */public class PartitionerProducer { private static final String[] PHONE_NUMS = new String[]{ "10000", "10000", "11111", "13700000003", "13700000004", "10000", "15500000006", "11111", "15500000008", "17600000009", "10000", "17600000011"}; public static void main(String[] args) throws Exception { Properties props = new Properties(); Props. The put (" the bootstrap. The servers, "" 192.168.42.89:9092192168 42.89:9093192168:42.89 9094"); / / set the props. The put (" partitioner. Class ", "... Com bonc rdpe kafka110. Partitioner. PhonenumPartitioner "); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); int count = 0; int length = PHONE_NUMS.length; while(count < 10) { Random rand = new Random(); String phoneNum = PHONE_NUMS[rand.nextInt(length)]; ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum); RecordMetadata metadata = producer.send(record).get(); String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition(); System.out.println(result); Thread.sleep(500); count++; } producer.close(); }}Copy the code

Custom partition result:

Two, Kafka consumer principle analysis

1. The offset offset

At this point, let’s bring out our Kafka cluster again. There are multiple consumers consuming information in the cluster

If the program had been executing stably, our whole process would have no problem, but now if the program stopped executing? It could be that there is a bug in the program, or it could be that we have manually stopped the program by making changes. Where should consumers start spending the next time they recover?

Topic: tellYourDream ConsumerA tellYourDream: P0 (10000) tellYourDream: P1 (10001)Copy the code

Offset is kind of like array indices, like why array indices start at 0, based on the memory model of arrays. It’s the distance between the array location and the start address. Array [0] is the position offset to 0, which is the first address. Array [k] is the position offset by K. The same is true of offset in Kafka, which is used to record a position. It is used to indicate that the consumer has made this purchase.

In Kafka, Kafka does not help maintain the offset, which is maintained by the consumer. Kafka provides two arguments for offset. One is enable_auto_commit. When this parameter is set to true, kafka will consume all data again every time it restarts. The second is auto_COMMIT_interval_ms, which is an interval for each offset submission.

This offset is stored in ZooKeeper before version 0.8. The whole Kafka cluster has many topics, and the system has tens of thousands of consumers consuming them. If the offset is stored on ZooKeeper, the consumer must submit the value to ZooKeeper every time. If you don’t have a problem with this, you haven’t read the episode: Kafka cluster deployment practices and operations related to 3.4- consumer information, go to 🤣 review.

After version 0.8, Kafka stores this offset in an internal theme called consumer_offset. The internal theme has 50 partitions by default, and as we know, the consumer group has a group.ID for them. When committed, the key is groud. id+topic+ partition number (this ensures that all data offsets from the same partition in the Kakfa cluster are committed to the same partition with consumer_offset). This sentence is a little tricky, but make sure you understand it.

Value is the value of the current offset. At intervals kafka compacts this topic internally. For each group. Id +topic+ partition number, keep the latest data. And because the consumer_offsets may receive high concurrency requests, the default partition is 50, so if your Kafka has a large cluster of, say, 50 machines, it’s much better to have 50 machines to resist the offset submission.

2.Coordinator

Each consumer group selects a broker as its coordinator. It monitors the heartbeat of each consumer in the group and determines whether it is down. After making rebalance, it selects a broker based on an internal selection mechanism. Kafka distributes consumer groups evenly among brokers to be managed by coordinators. Once started, each consumer in the consumer group communicates with the broker where the coordinator corresponding to the elected consumer group resides, and the Coordinator assigns partitions to the consumer for consumption. Coordinators allocate partitions to consumers as evenly as possible.

2.1 How can I Select a Coordinator?

First of all to the hash of the groupId consumer groups, then the number of partitions to consumer_offsets modulus, the default is 50, through offsets. Topic. Num. Partitions to set, Find out which partition of your consumer group’s offset to submit to the Consumer_offsets. Such as: GroupId, “membership consumer-group” -> hash value (number) -> modulo 50 (result can only be 0~49, > < span style = “color: RGB (50, 50, 50); Find a partition of the consumer_offsets (where the default number of copies of the consumer_offset partition is 1, with only one leader) and then find the broker corresponding to the leader for that partition. The broker is the coordinator of the consumer group. The consumer then maintains a Socket connection to communicate with the broker.

In fact, the simple explanation is to find the corresponding partition of the consumer_offsets. So if it’s 2, look for the second of the 50 consumer_offsets partitions, which is P1. So if it’s 10, look for the tenth of the 50 consumer_offsets, which is p9.

2.2 What does a Coordinator do

The coordinator then selects a leader consumer (the one who registers first becomes the leader), and the Coordinator reports to the Leader consumer about the Topic. Leader Consumer makes the consumption plan. A SyncGroup request is then sent to return the consumption plan to the coordinator.

Sum it up again in a short paragraph:

First, there is a consumer group. The consumer group will have its group. Id number, according to which broker can be calculated as its coodinator. By default, the coordinator will select the first registered consumer as the leader consumer and report the situation of the whole Topic to the leader consumer. The leader consumer then makes a consumption plan based on load balancing and returns it to the coordinator. After the coordinator gets the plan, it sends it to all consumers to complete the process.

A consumer sends a heartbeat to a coordinator. A consumer is a secondary node, and a coordinator is the primary node. When a consumer has not been in contact with a Coordinator for a long time, the task assigned to the consumer is re-executed. If the leader Consumer fails, a new leader is elected and the steps just mentioned are performed.

2.3 Load Balancing in Zone Schemes

If a consumer joins or quits temporarily, the leader consumer needs to make a new consumption plan.

Such as a theme of our consumption has 12 partitions: p0, p1, p2, p3, p4, p5, p6 and p7, p8, p9, p10, p11

Suppose we have three consumers in our consumer group

2.3.1 range strategy

The range policy is based on partiton’s ordinal range

p0~3             consumer1
p4~7             consumer2
p8~11            consumer3
Copy the code
2.3.2. Round – robin strategy
Consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11Copy the code

But there is a problem with the previous two schemes: suppose that Consuemr1 fails: P1-5 is assigned to consumer2, and P6-11 to Consumer3, so that the p6 and P7 partitions that were originally on Consumer2 are assigned to Consumer3.

2.3.3. Sticky strategy

The latest sticky strategy is to make sure that the consumer’s partition still belongs to them at rebalance time, and rebalance it so that the consumer can maintain the same partition allocation as possible

Consumer1:0-3 consumer2: 4-7 consumer3: 8-11 suppose consumer3 fails consumer1:0-3, +8,9 consumer2: 4-7, +10,11Copy the code
2.3.4 Rebalance generational mechanism

While rebalance, you may consume partition3 but have no offset. Rebalance and assign the partition3 to another consumer. Will it work if you submit the offset of the partition3 data? Rebalance every time the consumer group generation is triggered. Each generation will add 1. Then the offset you submitted will not work. Everyone re-consumes data according to the new Partiton allocation scheme.

That’s the important stuff, and then it’s code time for fun.

Iii. Consumer code part

It’s not exactly the same as the producer but the structure is exactly the same, so it’s a little bit shorter than the producer. Because these things are already known, a lot of things through the search engine is not difficult to solve.

public class ConsumerDemo { private static ExecutorService threadPool = Executors.newFixedThreadPool(20); public static void main(String[] args) throws Exception { KafkaConsumer<String, String> consumer = createConsumer(); Consumer.subscribe (array.aslist ("order-topic")); Try {while(true) {ConsumerRecords<String, String> records = consumer.poll(integer.max_value); For (ConsumerRecord<String, String> record: records) { JSONObject order = JSONObject.parseObject(record.value()); threadPool.submit(new CreditManageTask(order)); } } } catch(Exception e) { e.printStackTrace(); consumer.close(); }} private static KafkaConsumer<String, String> createConsumer() {// Props = new Properties(); props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("heartbeat.interval.ms", 1000); // Props. Put ("session.timeout.ms", 10 * 1000); // If kafka broker does not feel a consumer heartbeat in 10 seconds, props. Put (" max-poll.interval. Ms ", 30 * 1000); // If the poll takes 30 seconds to execute the next poll, the consumer is dead and rebalance. Rebalance. Other consumers need to make a heartbeat to rebalance their rebalance. Rebalance. 500 props.put("fetch.max.bytes", 10485760); props.put("max.poll.records", 500); Put ("connection.max.idle.ms", -1); // If you want to consume a lot of throughput, you can raise some props. // If you need to restart consumer every time, Put (" enable.auto.mit ", "true"); // an interval for each automatic submission of offset props. Put (" auto.mit.ineterval.ms ", "1000"); // Each restart starts with props. Put (" aut.offset. Reset ", "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); return consumer; } static class CreditManageTask implements Runnable { private JSONObject order; public CreditManageTask(JSONObject order) { this.order = order; } @override public void run() {system.out.println (" "); + order.toJSONString()); }}}Copy the code

3.1 Core parameters of consumers

3.1.1 the heartbeat. Interval. Ms 】

The consumer’s heartbeat must be maintained to know if the consumer is down. After that, the user rebalance all the other consumers using the rebalance

3.1.2 the session. A timeout. Ms 】

How long does It take Kafka to sense that a consumer has failed? The default is 10 seconds

3.1.3 (Max. Poll. Interval. Ms 】

If the time between two poll operations exceeds this value, the consume processing power will be considered too weak and will be kicked out of the consumption group, and the partition will be allocated to others to consume. Once again, it can be set according to the performance of your own business processing

3.1.4 【 fetch. Max. Bytes 】

Get the maximum number of bytes for a message. It is recommended to set the maximum number of bytes

3.1.5 (Max. Poll. Records 】

The maximum number of messages returned by a poll is 500 by default

3.1.6 【 connection. Max. Idle. Ms 】

If the socket connection between the consumer and broker has been idle for a certain period of time, the connection will be automatically reclaimed. However, the socket connection will be re-established for the next consumption. This recommendation is set to -1

3.1.7 【 auto. Offset. Reset 】

Earliest: When there is a submitted offset under each partition, the money will be consumed from the submitted offset; No submit offset, consumption topicA from scratch - > partition0:1000 partitino1:2000 latest: when has submitted the offset of the partition, starting from the submitted to the offset of consumption; If there is no committed offset, the consumption starts from the current position. If there are committed offsets in each topic partition, the consumption starts after the offset. An exception is thrown whenever a partition does not have a committed offsetCopy the code

Note: We usually set “latest” in production

3.1.8 MIT [enable.auto.com]

This is to turn on autocommit unique

3.1.9 [auto.com MIT. Ineterval. Ms]

This is how often the condition is offset

Four, extra meal time: supplement the first content not mentioned

Log binary search

In fact, this could also be called a loose index. It’s also a kind of jumper structure. Open the partition under a theme and we can see some files like this

00000000000000000000. The index (index) of the deviation of the 00000000000000000000. The log (log file) 00000000000000000000. Timeindex index (time)Copy the code

Log files correspond to two index files. Index and. Timeindex. When kafka writes to a log file, it writes to index files. Index and. Timeindex, one for displacement and one for timestamp.

By default, the log.index.interval. Bytes parameter specifies how much data can be written to the log file, so an index must be written to the index file. The default is 4KB. Index files are sorted in ascending order by displacement and timestamp, so kafka uses binary search (O(logN)) to locate index data in the.log file.

The top 0,2039… these represent physical locations. Why poor index than direct lines to read fast, it is not every a data record, data is separated by a few records, but now like to consume the offset data for the 7, first look at the poor index directly on the record, find a 6, 7 is greater than 6, and then directly to see the back of the data, find 8, 8 is bigger than 7, look back, Make sure that 7 is somewhere between 6 and 8, and that the physical location of 6 is 9807 and the physical location of 8 is 12345, and go directly between them. It speeds up the physical location search. Just like binary search in the normal case.

ISR mechanism

Just relying on multiple replicas ensures high availability in Kafka, but does it guarantee data loss? No, because if the leader crashes but the leader’s data has not been synchronized to the followers, even if the followers are elected as the new leader, the previous data will have been lost.

ISR is: In-sync Replica refers to the number of follower partitions that are synchronized with the leader partition. Only followers in the ISR list can be elected as the new leader if the leader breaks down. Because his data in the ISR list is synchronized with the leader.

To ensure that data written to kafka is not lost, ensure that there is at least one follower in the ISR. After a piece of data has been written to the leader partition, it must be copied to all the follower partitions in the ISR. It will never be lost, Kafka promises

When can a replica be kicked out of the ISR? If a replica has not synchronized with the leader for more than 10 seconds, it will be kicked out of the ISR list. However, if this problem is solved (network jitter or full GC, etc.), the follower will synchronize with the leader again, and the leader will have a judgment. If the data difference is small, the follower will rejoin.

finally

This time the length is very long, and need to understand the place is also a lot of, in fact, in the kafka kernel there is a HW&LEO principle, but I am too lazy to continue to write HHH. Let’s talk about it next time in source code.