www.aboutyun.com/thread-1288…

orchome.com/kafka/index\

Problem reading



1. Where is Kafka’s unique design?

2. How Kafka constructs and creates topics, sends messages, and consumes messages?

3. How to write a Kafka program?

4. What are the three transaction definitions for data transfer?

5. What are two conditions for Kafka to determine if a node is alive?

6. Does the producer directly send data to the broker’s leader?

7. Can Kafa consumer consume partition messages?

8. Do Kafka messages use Pull mode or Push mode?

9. What are the two Procuder APIS available?

10. What is the message format that Kafka stores on hard disk?









1. Basic Concepts



introduce



Kafka is a distributed, partitioned, and replicable messaging system. It provides the functionality of a normal messaging system, but with its own unique design.



What does this unique design look like?



Let’s start with some basic messaging system terms:

Kafka categorizes messages by topic.

The program that will publish messages to Kafka Topic becomes Producers.

The application that orders topics and consumes the message becomes consumer.

Kafka runs as a cluster and can be made up of one or more services, each called a broker.

Producers send messages over the network to the Kafka cluster, which provides messages to consumers, as shown in the following figure:



 



The client communicates with the server through TCP. Kafka provides a Java client and supports multiple languages.





Switchable viewer and Logs



Let’s start with an abstraction that Kafka provides :topic.

A topic is a generalization of a set of messages. For each topic, Kafka partitions its logs as shown below:

 



Each partition consists of an ordered series of immutable messages appended to the partition consecutively. Each message in the partition has a sequential serial number called offset that uniquely identifies the message in the partition.

The Kafka cluster retains all published messages, regardless of whether they are consumed or not, for a configurable period of time. For example, if the save policy for messages is set to two days, a message can be consumed for two days after it is published. It will then be discarded to free up space. Kafka’s performance is constant regardless of the amount of data, so retaining too much data is not a problem.



In fact, the only data that each consumer needs to maintain is the position of the message in the log, known as offset. The offset is maintained by the consumer: normally the value of the offset increases as the consumer reads the message, but the consumer can read the message in any order. For example, it can set the offset to an old value to re-read the previous message.



The combination of these features makes Kafka consumers very lightweight: they can read messages without affecting clusters and other consumers. You can tail messages using the command line without impacting other consumers who are consuming messages.



Partitioning logs serves the following purposes: First, it keeps the number of logs per log small and can be saved on a single service. In addition, each partition can be published and consumed separately, providing a possibility for concurrent operation topics.



distributed



Each partition has replicas among several services in the Kafka cluster, so that the services that hold replicas can process data and requests together, and the number of replicas can be configured. Replicas make Kafka fault tolerant.

Each partition has one server as the “leader” and zero or more servers as the “followers”. The leader is responsible for reading and writing messages, while the followers copy the leader. If the leader goes down, one of the followers automatically becomes the leader. Each service in the cluster plays two roles: as leader for one of the partitions it owns, and as followers for the other partitions, so that the cluster has better load balancing.



Producers



The Producer publishes the message to the topic it specifies and decides which partition to publish to. Partitions are usually selected randomly by a simple load balancing mechanism, but partitions can also be selected by specific partitioning functions. The second type is more commonly used.





Consumers



There are two modes of publishing messages: queuing mode and publish-subscribe mode. In the queue mode, consumers can read messages from the server at the same time, and each message is read by only one of the consumers. In publish-subscribe mode messages are broadcast to all consumers. Consumers can join a consumer group and compete for a topic, and messages from the topics will be distributed to one member of the group. Consumers in the same group can be in different applications or on different machines. If all consumers are in a group, this becomes a traditional queue pattern, with load balancing among consumers. If all consumers are not in different groups, this becomes a publish-subscribe model, where all messages are distributed to all consumers. More commonly, each topic has a number of consumer groups, each of which is a logical “subscriber” composed of several consumers for fault tolerance and better stability. This is essentially a publish-subscribe model, but the subscribers are groups rather than individual consumers.



 



A cluster of two machines has four partitions (P0-P3) and two consumer groups. Group A has two and group consumerB has four



Compared to traditional messaging systems, Kafka guarantees order.

Traditional queues hold ordered messages on the server, and if multiple consumers consume messages from the server at the same time, the server distributes messages to consumers in the order in which they are stored. Although the server publishes messages sequentially, the messages are asynchronously distributed to consumers, so by the time the messages arrive they may have lost their original order, meaning that concurrent consumption will cause ordering disorder. To avoid failures, such messaging systems often use the concept of “dedicated consumer,” which allows only one consumer to consume messages, which of course means a loss of concurrency.



Kafka does this better. Through the partitioning concept, Kafka can provide better ordering and load balancing when multiple consumer groups are running concurrently. Each partition is distributed to only one consumer group, so that a partition is consumed by only one consumer in the group, and messages for that partition can be consumed sequentially. Because there are multiple partitions, you can still load balance across multiple consumer groups. Note that the number of consumer groups cannot be greater than the number of partitions, that is, as many partitions as concurrent consumption is allowed.



Kafka can only guarantee the ordering of messages within a partition, not between partitions, which is sufficient for most applications. If ordering of all messages in a topic is required, the topic should have only one partition, and of course only one consumer group consuming it.



# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #



Second, environment building





Step 1: Download Kafka



Click to download the latest version and unpack it.

\

  1. > the tar – XZF kafka_2. 9.2 0.8.1.1. TGZ \

  2. > CD kafka_2. 9.2 0.8.1.1

Copy the code

Kafka uses Zookeeper, so start Zookper first. Start a single instance of Zookkeeper. You can put an ampersand at the end of the command to boot and leave the console. \

  1. > bin/zookeeper-server-start.sh config/zookeeper.properties &\

  2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)\

  3. .

Copy the code

Now start Kafka:\

  1. > bin/kafka-server-start.sh config/server.properties\

  2. [the 2013-04-22 15:01:47, 028] INFO Verifying the properties (kafka. Utils. VerifiableProperties) \

  3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)\

  4. .

Copy the code

Create a topic called “test” with only one partition and one copy. \

  1. > bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

Copy the code

You can view the topic:\ created by using the list command

  1. > bin/kafka-topics.sh –list –zookeeper localhost:2181\

  2. test

Copy the code

In addition to manually creating topics, you can configure the broker to automatically create topics. Step 4: send messages. Kafka uses a simple command line producer to read messages from a file or from standard input and send them to the server. By default, each command will send a message. Run producer and type messages into the console. These messages are sent to the server: \

  1. > bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test \

  2. This is a messageThis is another message

Copy the code

CTRL + C to quit sending. Step 5: Consumer Kafka also has a command line consumer that will dump out messages to standard output. Kafka also has a command line consumer that reads messages and prints them to standard output: \

  1. > bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning\

  2. This is a message\

  3. This is another message

Copy the code

Running the Consumer command line on one terminal and the producer command line on the other terminal allows you to input messages on one terminal and read messages on the other. Both commands have their own optional parameters, so you can run them without any parameters to see the help information. Start a cluster of three brokers, all of which are located on the same machine. Write a configuration file for each node

  1. > cp config/server.properties config/server-1.properties\

  2. > cp config/server.properties config/server-2.properties

Copy the code

Add the following parameters to the new copied file: \

  1. config/server-1.properties:\

  2.     broker.id=1\

  3.     port=9093\

  4.     log.dir=/tmp/kafka-logs-1

Copy the code


\

  1. config/server-2.properties:\

  2.     broker.id=2\

  3.     port=9094\

  4.     log.dir=/tmp/kafka-logs-2

Copy the code

Broker.id marks a node in the cluster only, and because it is on the same machine, different ports and log files must be specified to avoid overwriting the data. We already have Zookeeper and our single node started, so we just need to start the two new nodes: Having just started Zookeeper and one node, now start the other two nodes: \

  1. > bin/kafka-server-start.sh config/server-1.properties &\

  2. . \

  3. > bin/kafka-server-start.sh config/server-2.properties &\

  4. .

Copy the code

Create a topic with three copies :\

  1. > bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic

Copy the code

Now that we have a cluster, how do we know about each node? Just run the “describe Topics “command: \

  1. > bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic

Copy the code

\

  1. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:\

  2. Topic: my-replicated- Topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

Copy the code

The output is explained below. The first line is a description of all the partitions, and then there’s a row for each partition, and since we only have one partition, we just add a row. Replicas: Lists all replicas, regardless of whether the replicas are in service. In our example, node 1 is running as leader. Send a message to topic: \

  1. > bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic

Copy the code

\

  1. . \

  2. my test message 1my test message 2^C

Copy the code

Consume these messages: \

  1. > bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic

Copy the code

\

  1. . \

  2. my test message 1\

  3. my test message 2\

  4. ^C

Copy the code

Test your fault tolerance. Broker 1 runs as the leader, now we kill it: \

  1. > ps | grep server – 1. Properties7564 ttys002 0:15. 91 / System/Library/Frameworks/JavaVM framework Versions / 1.6 / Home/bin/Java… \

  2. > kill -9 7564

Copy the code

Another node is chosen to be the leader, and node 1 no longer appears in the in-sync replica list: \

  1. > bin/kafka-topics.sh –describe –zookeeper localhost:218192 –topic my-replicated-topic\

  2. Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:\

  3. Topic: my-replicated- Topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

Copy the code

Although the leader, who was initially responsible for renewing the message, is down, the previous message can still be consumed: \

  1. > bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic\

  2. . \

  3. my test message 1\

  4. my test message 2

Copy the code

Kafka’s fault tolerance seems to be good. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 3, structures, Kafka development environment We set up a server for the kafka, and can use kafka command line tools to create topic, send and receive messages. Let’s set up the development environment for Kafka. One way to do this is to add the kafka jar package from the Kafka installation package to the project classpath. However, we use another, more popular approach: using Maven to manage JAR dependencies. After creating the Maven project, add the following dependencies to pom.xml: \

  1. <dependency>\

  2.         <groupId> org.apache.kafka</groupId >\

  3. < artifactId > kafka_2. 10 < / artifactId > \

  4. The < version > 0.8.0 < / version > \

  5. </dependency>

Copy the code


After adding the dependencies, you will find that two jar dependencies cannot be found. After unpacking the jar files, you have two options. The first option is to use MVN install to install the jar files in the local repository. The other option is to directly copy the unpacked jar files to the com folder in the local repository. For example, if my local repository is D :\ MVN, my directory structure will look like this:



 





Configuration program



The first is an interface that acts as a configuration file, configuring the various connection parameters for Kafka: \

  1. package com.sohu.kafkademon; \

  2. public interface KafkaProperties\

  3. {

    \

  4. Final static String zkConnect = “10.22.10.139:2181”; \

  5. final static String groupId = “group1”; \

  6. final static String topic = “topic1”; \

  7. Final static String kafkaServerURL = “10.22.10.139”; \

  8. final static int kafkaServerPort = 9092; \

  9. final static int kafkaProducerBufferSize = 64 * 1024; \

  10. final static int connectionTimeOut = 20000; \

  11. final static int reconnectInterval = 10000; \

  12. final static String topic2 = “topic2”; \

  13. final static String topic3 = “topic3”; \

  14. final static String clientId = “SimpleConsumerDemoClient”; \

  15. }

Copy the code


producer

\

  1. package com.sohu.kafkademon; \

  2. import java.util.Properties; \

  3. import kafka.producer.KeyedMessage; \

  4. import kafka.producer.ProducerConfig; \

  5. / * * /

  6. * @author leicui [email protected]\

  7. * / /

  8. public class KafkaProducer extends Thread\

  9. {

    \

  10. private final kafka.javaapi.producer.Producer<Integer, String> producer; \

  11. private final String topic; \

  12. private final Properties props = new Properties(); \

  13.     public KafkaProducer(String topic)\

  14.     {

    \

  15. props.put(“serializer.class”, “kafka.serializer.StringEncoder”); \

  16. Props. The put (metadata. Broker. “the list”, “10.22.10.139:9092”); \

  17. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); \

  18. this.topic = topic; \

  19. } \

  20.     @Override\

  21.     public void run() {

    \

  22. int messageNo = 1; \

  23.         while (true)\

  24.         {

    \

  25. String messageStr = new String(“Message_” + messageNo); \

  26. System.out.println(“Send:” + messageStr); \

  27. producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); \

  28. messageNo++; \

  29.             try {

    \

  30. sleep(3000); \

  31.             } catch (InterruptedException e) {

    \

  32.                 // TODO Auto-generated catch block\

  33. e.printStackTrace(); \

  34. } \

  35. } \

  36. } \

  37. }

Copy the code


consumer\

  1. package com.sohu.kafkademon; \

  2. import java.util.HashMap; \

  3. import java.util.List; \

  4. import java.util.Map; \

  5. import java.util.Properties; \

  6. import kafka.consumer.ConsumerConfig; \

  7. import kafka.consumer.ConsumerIterator; \

  8. import kafka.consumer.KafkaStream; \

  9. import kafka.javaapi.consumer.ConsumerConnector; \

  10. / * * /

  11. * @author leicui [email protected]\

  12. * / /

  13. public class KafkaConsumer extends Thread\

  14. {

    \

  15. private final ConsumerConnector consumer; \

  16. private final String topic; \

  17.     public KafkaConsumer(String topic)\

  18.     {

    \

  19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(\

  20. createConsumerConfig()); \

  21. this.topic = topic; \

  22. } \

  23.     private static ConsumerConfig createConsumerConfig()\

  24.     {

    \

  25. Properties props = new Properties(); \

  26. props.put(“zookeeper.connect”, KafkaProperties.zkConnect); \

  27. props.put(“group.id”, KafkaProperties.groupId); \

  28. props.put(“zookeeper.session.timeout.ms”, “40000”); \

  29. props.put(“zookeeper.sync.time.ms”, “200”); \

  30. props.put(“auto.commit.interval.ms”, “1000”); \

  31. return new ConsumerConfig(props); \

  32. } \

  33.     @Override\

  34.     public void run() {

    \

  35. Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); \

  36. topicCountMap.put(topic, new Integer(1)); \

  37. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); \

  38. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); \

  39. ConsumerIterator<byte[], byte[]> it = stream.iterator(); \

  40.         while (it.hasNext()) {

    \

  41. System.out.println(“receive: “+ new String(it.next().message())); \

  42.             try {

    \

  43. sleep(3000); \

  44.             } catch (InterruptedException e) {

    \

  45. e.printStackTrace(); \

  46. } \

  47. } \

  48. } \

  49. }

Copy the code

Simple Send and Receive Run the following program to do a simple send and receive message: \

  1. package com.sohu.kafkademon; \

  2. / * * /

  3. * @author leicui [email protected]\

  4. * / /

  5. public class KafkaConsumerProducerDemo\

  6. {

    \

  7.     public static void main(String[] args)\

  8.     {

    \

  9. KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic); \

  10. producerThread.start(); \

  11. KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic); \

  12. consumerThread.start(); \

  13. } \

  14. }

Copy the code

Below the high-level consumer are the programs that compare the loads sent and received: \

  1. package com.sohu.kafkademon; \

  2. import java.util.HashMap; \

  3. import java.util.List; \

  4. import java.util.Map; \

  5. import java.util.Properties; \

  6. import kafka.consumer.ConsumerConfig; \

  7. import kafka.consumer.ConsumerIterator; \

  8. import kafka.consumer.KafkaStream; \

  9. import kafka.javaapi.consumer.ConsumerConnector; \

  10. / * * /

  11. * @author leicui [email protected]\

  12. * / /

  13. public class KafkaConsumer extends Thread\

  14. {

    \

  15. private final ConsumerConnector consumer; \

  16. private final String topic; \

  17.     public KafkaConsumer(String topic)\

  18.     {

    \

  19.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(\

  20. createConsumerConfig()); \

  21. this.topic = topic; \

  22. } \

  23.     private static ConsumerConfig createConsumerConfig()\

  24.     {

    \

  25. Properties props = new Properties(); \

  26. props.put(“zookeeper.connect”, KafkaProperties.zkConnect); \

  27. props.put(“group.id”, KafkaProperties.groupId); \

  28. props.put(“zookeeper.session.timeout.ms”, “40000”); \

  29. props.put(“zookeeper.sync.time.ms”, “200”); \

  30. props.put(“auto.commit.interval.ms”, “1000”); \

  31. return new ConsumerConfig(props); \

  32. } \

  33.     @Override\

  34.     public void run() {

    \

  35. Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); \

  36. topicCountMap.put(topic, new Integer(1)); \

  37. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); \

  38. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); \

  39. ConsumerIterator<byte[], byte[]> it = stream.iterator(); \

  40.         while (it.hasNext()) {

    \

  41. System.out.println(“receive: “+ new String(it.next().message())); \

  42.             try {

    \

  43. sleep(3000); \

  44.             } catch (InterruptedException e) {

    \

  45. e.printStackTrace(); \

  46. } \

  47. } \

  48. } \

  49. }

Copy the code

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 4, data persistence Don’t be afraid to file system! Kafka relies heavily on file systems to store and cache messages. The conventional wisdom about hard disks is that they are always slow, which has led many to question whether file system-based architectures can provide superior performance. In fact, the speed of a hard disk depends entirely on how you use it. A well-designed hard disk architecture can be as fast as memory. The linear write speed on the six 7200 RPM SATA RAID-5 disk arrays is almost 600MB/s, but the random write speed is 100K /s, a difference of almost 6000 times. Modern operating systems use read-ahead and write-behind techniques to block pre-read data while reading, and to organize small and trivial logical writes into a larger physical write while writing. For an in-depth discussion of this, they found that linear access to disk is often much faster than random memory access. To improve performance, modern operating systems tend to use memory as the disk cache, and modern operating systems are happy to use all free memory as the disk cache, although this may sacrifice some performance when the cache is reclaimed and reallocated. All disk reads and writes pass through this cache, which is unlikely to be circumvented unless I/O is used directly. So although each program only caches one copy of data in its own thread, the operating system caches another copy, which is equal to storing two copies of data. In addition to discussing the JVM, the following two facts are well known: •Java objects take up very large amounts of space, almost twice as much or more data to store. • Garbage collection becomes more and more difficult as the amount of data in the heap increases. Based on the above analysis, if you cache data in memory, you have to use twice as much memory because you need to store two copies. Kafka is based on the JVM, you have to double that space again. Plus, to avoid the performance impact of GC, on a 32GB machine, you have to use 28-30GB of memory. And when the system restarts, data has to be flushed into memory (10GB of memory takes about 10 minutes), so even using a cold flush (not flushing into memory all at once, but flushing into memory without using the data) can cause initial refresh to be very slow. But with a file system, there is no need to refresh data even if the system is restarted. Using a file system also simplifies the logic of maintaining data consistency. So instead of caching data in memory and flushing it to hard disk, Kafka writes data directly to the file system’s logs. Constant time operation efficiency In most messaging systems, the mechanism for data persistence is to provide a B-tree or other data structure with random reads and writes for each cosumer. B trees are great, of course, but they come with some costs: for example, B trees are order log N, which is generally considered constant, but not for hard disk operations. A disk search takes 10ms, and each disk can only search once at a time, making concurrent processing a problem. Although storage systems are heavily optimized with caching, observations of tree performance show that performance tends to decline linearly as data increases, doubling the speed. Intuitively, for messaging systems that are primarily used for logging, persistence of data can be achieved simply by appending data to a file and then reading it from the file as it is read. The advantage of this is that both reads and writes are O(1), and that reads do not block writes and other operations. The performance advantage is obvious, since performance is independent of the size of the data. Now that you can build a messaging system with almost no capacity constraints (relative to memory) on hard disk space, you can provide some features that normal messaging systems do not have with no performance penalty. For example, whereas most messaging systems delete messages as soon as they are consumed, Kafka can store messages for a period of time (say, a week), which gives consumers great flexibility and flexibility, as discussed in future articles. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 5, message transmission transaction definition discussed how consumer and producer work before, now to discuss about the data transfer. Transaction definitions for data transfer generally have the following three levels: \

  • Maximum once: The message will not be sent again. It can be transmitted at most once or not at all.
  • Least once: The message is transmitted at least once, but may be transmitted repeatedly.
  • Exactly once: every message is transmitted once and only once, which is Exactly what is expected. \

Most messaging systems claim to be “exact once,” but a close reading of their documentation shows misleading information about what happens when a consumer or producer fails, or when multiple consumers work in parallel, or when data written to hard drive is lost. Kafka is a bit more advanced. Kafka has a concept of “committed” when publishing messages. Once a message is committed, data is not lost as long as the replica broker of the partition to which the message was written is active. The concept of replica activity is discussed in the next section of the document. Now assume that the broker will not go down. If a producer posts a message with a network error and is unsure whether it actually happened before or after the commit, this is an unusual situation, but it must be taken into account. The current version of Kafka does not address this issue, and future versions will attempt to do so. While not all situations require a level as high as “exact once,” Kafka allows producer flexibility in specifying levels. For example, the producer can specify that he or she must wait for notification when the message is committed, or send the message completely asynchronously without waiting for any notification, or only wait for the leader to declare that he or she has the message (followers unnecessary). Now consider this from the consumer’s side. All copies have the same log file and the same offset. The consumer maintains the offset of the messages it consumes. If a consumer crashes, another consumer takes over the consuming message, which needs to be processed from an appropriate offset. In this case the following options are available: \

  • The consumer can read the message, write the offset to the log file, and then process the message. There is a possibility that the message will crash after the offset is stored, and the new consumer will continue processing from the offset, and some messages will never be processed.
  • The consumer can read the message, process the message, and record the offset. If the consumer crashes before recording the offset, the new consumer will repeatedly consume some messages.
  • “Exact once” can be solved by splitting the commit into two phases: commit once after the offset is saved, and commit again after the message is successfully processed. But there’s an easier way to do it: store the offset of the message with the result of the message being processed. For example, when a message is processed using Hadoop ETL, the processed result and offset are stored in HDFS at the same time, so that the message and offser can be processed at the same time. \

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 6, the performance optimization of Kafka has done much to improve efficiency. One of the main uses of Kafka is to handle web activity logs, where the throughput is very high, with multiple writes per page. On the read side, assuming that each message is consumed only once, the read volume is also large, and Kafka tries to make the read operation light. We discussed disk performance earlier, and linear reads and writes affect disk performance in two ways: too many trivial I/O operations and too many byte copies. I/O problems occur between the client and the server, as well as in persistent operations within the server. To avoid these problems, Kafka creates the concept of a “message set,” which groups messages together as a unit of processing. Processing messages as a message set improves performance rather than as a single message. Producer sends message sets to the server as a block instead of sending them one by one. The server appends the message set to the log file at once, which reduces trivial I/O operations. A consumer can also request a message set at once. Another performance optimization is in the area of byte copying. This is not a problem under low loads, but it can be significant under high loads. To avoid this problem, Kafka uses a standard binary message format that can be shared between producers, brokers, and producers without any changes. Zero Copy Broker maintains message logs that are simply directory files. Message sets are written to log files in a queued format shared by producer and consumer. This allows Kafka to be optimized in one important area: message delivery over the network. Modern Unix operating systems provide a high-performance system function to send data from the page cache to the socket. In Linux, this function is sendFile. To better understand the benefits of SendFile, let’s first look at the general direction of sending data from files to sockets: \

  • The operating system copies data from files to the page cache in the kernel
  • Applications copy data from the page cache to their own in-memory cache
  • The application writes data to the socket cache in the kernel
  • The operating system copies data from the socket cache to the nic interface cache, from where it is sent to the network. \

This is obviously inefficient, with 4 copies and 2 system calls. Sendfile avoids duplicate copying by sending data directly from the page cache to the nic interface cache, greatly optimizing performance. In a multi-consumers scenario, data is only copied to the page cache once rather than copied repeatedly each time a message is consumed. This allows messages to be sent at a rate close to network bandwidth. You hardly see any reads at the disk level because the data is sent directly from the page cache to the network. This article details the application of SendFile and zero-copy technology in Java. Data compression Most of the time, the performance bottleneck is not CPU or hard disk but network bandwidth, especially for applications that need to move large amounts of data between data centers. Of course, users can individually compress their messages without Kafka support, but this results in a lower compression rate, as it is best to compress a large number of files together rather than individually. Kafka uses end-to-end compression: Because of the concept of “message set”, messages from the client can be compressed together and sent to the server, written into a compressed log file, and sent to the consumer in a compressed format. Messages sent from the producer to the consumer are compressed, and are only decompressed when the consumer uses them. That’s why it’s called end-to-end compression. Kafka supports GZIP and Snappy compression protocols. More details can be found here. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 7, Producer and Consumer of Kafka Producer message is sent The producer directly sends the data to the broker’s leader(the master node), eliminating the need for distribution across multiple nodes. To help producer do this, all Kafka nodes can tell producer which nodes are active and where the leader of the target topic target partition is. This allows the producer to send messages directly to the destination. The client controls which partition the message will be distributed to. This can be done randomly by load balancing, or by using partitioning functions. Kafka allows users to implement partitioning functions, specify partitioning keys, and hash messages to different partitions (you can also override the partitioning function to implement your own logic if necessary). For example, if the key you specify is user ID, messages sent by the same user will be sent to the same partition. After being partitioned, consumers can purposefully consume messages from a partition. Asynchronous sending Batch sending can effectively improve the sending efficiency. Kafka Producer’s asynchronous send mode allows for batch sending, where messages are cached in memory and then sent in batches in a single request. This policy can be configured, for example, to specify that messages are sent when a certain number of messages are cached, or to be sent after a fixed amount of time is cached (such as 100 messages, or every 5 seconds). This policy greatly reduces the number of I/ OS on the server. Since caching is done on the Producer side, these messages are lost when the producer crashes. Kafka0.8.1’s asynchronous send mode does not yet support callbacks, so it cannot handle sending errors. Kafka 0.9 May add such callback functions. Kafka Consumer When a Consumer consumes a message, it sends a “fetch” request to the broker to consume a message for a specific partition. Consumer specifies the message’s offset in the log and can consume messages from there. Customer has control of the offset and can roll back to reconsume the previous message, which makes sense. Push or pull? The question Kafka initially considered was whether customer should pull the message from brokes or brokers should push the message to the consumer, i.e. pull or push. In this respect, Kafka follows a traditional design common to most messaging systems: the producer pushes messages to the broker, and the consumer pulls messages from the broker. Some messaging systems, such as Scribe and Apache Flume, use a push pattern to push messages downstream to consumers. This has both advantages and disadvantages: It is up to the broker to determine the rate at which the message is pushed, and it is not easy to handle consumers with different consumption rates. Messaging systems are designed to allow consumers to consume messages as quickly as possible, but unfortunately, in push mode, when the rate at which the broker pushes a message is much faster than the rate at which the consumer consumes it, the consumer crashes. In the end Kafka opted for the traditional pull model. Another benefit of the Pull pattern is that consumers can decide whether to Pull data from the broker in bulk. The Push pattern must decide whether to Push each message immediately or in batches after caching without knowing the consumption power and strategy of downstream consumers. If you push at a lower rate to avoid a consumer crash, you might waste pushing fewer messages at a time. In the Pull mode, consumers can decide on these strategies based on their spending power. A drawback of Pull is that if the broker has no messages available for consumption, it will cause consumers to poll in the loop until new messages reach t. To avoid this, Kafka has a parameter that lets the consumer block know when a new message arrives (or until the number of messages reaches a certain number so that they can be sent in batches). Consumption state tracing is also important for logging the status of consumption messages. Most messaging systems maintain a record of how messages are consumed at the broker side: the broker marks a message immediately after it is sent to a consumer or waits for a notification from a customer to mark it. This also allows messages to be deleted immediately after consumption to reduce space usage. But is there anything wrong with that? If a message is marked as consumed immediately after it is sent, the message is lost if the consumer fails to process the message (such as a program crash). To solve this problem, many messaging systems provide additional functionality: a message is only marked as sent when it has been sent, and only marked as consumed when it has been notified that the consumer has been successfully consumed. This solves the problem of message loss, but it creates a new problem. First, if the consumer processes the message successfully but fails to send a response to the broker, the message will be consumed twice. In the second case, the broker must maintain the state of each message, locking the message, changing the state, and releasing the lock each time. Again, not to mention maintaining large amounts of state data, such as the fact that if a message is sent and no notification of a successful consumption is received, it remains locked, Kafka takes a different approach. Topics are divided into partitions, each of which is consumed by only one consumer at a time. This means that the position in the log for each partition consumed message is just a simple integer: offset. This makes it easy to mark the consumption status of each partition, just an integer. This makes it easy to track consumption status. This brings another benefit: the consumer can reset the offset to an older value to re-consume the old message. This may seem strange to a traditional messaging system, but it is very useful. Who says a message can only be consumed once? The consumer found a bug in the program that parsed the data. It seems reasonable to parse the message again after fixing the bug. Processing messages offline Advanced data persistence allows consumers to batch load data to offline systems such as Hadoop or a data warehouse at regular intervals. In this case, Hadoop can split the load task into one load task per broker or per topic or per partition. Hadoop has a task management feature that allows you to restart a task when it fails without worrying about data being reloaded, simply loading messages from where they were last loaded. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 8, master-slave synchronization Kafka allows topic partition have several copies, this quantity can be configured, you can configure for each topci copy number. Kafka automatically backs up data on each replica, so data is still available when a node goes down. Kafka duplicates are not required, you can configure only one copy, which is equivalent to only one copy of data. Replicas are created in topic partitions, each with a leader and zero or more followers. All read and write operations are handled by the leader. Generally, the number of partitions is much larger than the number of brokers, and the leaders of each partition are evenly distributed among brokers. All followers copy the leader’s log, and the messages and order in the log are the same as those in the leader. Flowers pulls the message from the leader and stores it in its own log file just like a normal consumer. Many distributed messaging systems automatically handle failed requests, and they have a clear definition of whether a node is alive. Kafka determines whether a node is alive under two conditions: \

  • The node must be able to maintain the connection with ZooKeeper. ZooKeeper checks the connection to each node through the heartbeat mechanism.
  • If the node is a follower, it must be able to synchronize the leader’s write operations in a timely manner without a long delay. \

A node that meets the above criteria is technically “in sync”, not vaguely “alive” or “failed”. The Leader keeps track of all “synchronizing” nodes and removes one if it becomes down, stuck, or delayed for too long. How long the delay is considered “too long” is determined by replica.lag.max.messages, and what is considered stuck is determined by replica.lag.time.max.ms. A message is “committed” only when all replicas are added to the log, and only committed messages are sent to the consumer so that there is no fear that the message will be lost if the leader goes down. The Producer can also choose whether to wait for notification that the message is committed, which is determined by the request.required. Acks parameter. Kafka guarantees that “committed” messages are not lost as long as there is a “synchronizing” node. At the heart of Kafka, the Leader’s choice, are log files. Synchronization of log files across a cluster is the most fundamental element of a distributed data system. We don’t need followers if leaders never go down! Once the leader is down, a new leader needs to be selected from the followers. However, the followers themselves may delay too long or crash, so high-quality followers must be selected as the leader. It must be guaranteed that once a message is committed but the Leader is down, the newly elected leader must be able to provide the message. Most distributed systems use the majority voting rule to select a new leader. For the majority voting rule, the most suitable leader is dynamically selected according to the status of all replica nodes.Kafka does not use this method. Kafaka dynamically maintains a set of in-sync Replicas (ISR). Each node in this set is highly consistent with the leader. Each node in this set must read and append a message to the log. To notify the outside world that the message has been committed. Therefore, any node in this set can be selected as the leader at any time.ISR is maintained in ZooKeeper. If the ISR has F +1 nodes, messages are not lost and services are normally provided when f nodes go down. The membership of an ISR is dynamic, and if a node becomes obsolete, it can rejoin the ISR when it reaches the “synchronizing” state. This leader selection method is very fast and suitable for Kafka scenarios. An evil thought: What if all the nodes go down? Kafka’s guarantee that data will not be lost is based on the fact that at least one node is alive. Once all nodes are down, this cannot be guaranteed. In practice, you must react in a timely manner when all replicas are down. There are two options :\

  • Wait for any node in the ISR to recover and act as the leader.
  • Select the first recovered node of all nodes (not just the ISR) as leader.\

This is a tradeoff between availability and continuity. If you wait for the nodes in the ISR to recover, once the nodes in the ISR fail to get up or the data is already there, the cluster will never recover. If you wait for an unexpected ISR node to recover, the node’s data will be treated as online data, which may be different from the real data, because some data may not be synchronized. Kafka currently chooses the second policy, and future releases will make the choice of this policy configurable and flexible depending on the scenario. This dilemma applies not only to Kafka, but to almost all distributed data systems. Replica management was only discussed in the context of a topic partition, but in reality a Single Kafka will manage thousands of topic partitions. Kafka tries to distribute all partitions evenly across all nodes of the cluster rather than concentrating on some nodes. In addition, the master-slave relationship is also balanced so that each node acts as the leader of a certain proportion of partitions. It is also important to optimize the leader selection process, which determines how long the window will be empty when the system fails. Kafka selects a node as a “controller”, which is responsible for selecting a new leader among all nodes in the swimming partition when a node is down. This allows Kafka to batch and efficiently manage the master-slave relationship of all nodes in the partition. If the controller goes down, Living node of a switch will prepare for the new controller. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 9, client API Kafka Producer APIs Procuder There are two API: kafka. Producer. SyncProducer and kafka. Producer. Async. AsyncProducer. They all implement the same interface:

  1. class Producer {

    \

  2. /* Sends the message to the specified partition */\

  3. publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData); \

  4. /* Send a batch of messages */\

  5. publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData); \

  6. /* Close producer */\

  7. publicvoid close(); \

  8. }

Copy the code

The Producer API provides the following functionality: \

  • Multiple messages can be cached in a local queue and sent asynchronously to the broker in batches with the argument producer.type=async. The size of the cache can be specified with a few parameters: queue.time and batch.size. A background thread (kafka. Producer. Async. ProducerSendThread) to remove the data from the queue and let kafka. Producer. The EventHandler sends the message to the broker, Handler can also be customized to register handlers at different stages of processing data on the Producer side, such as logging or monitoring the process. Simply implement kafka. Producer. Async. CallbackHandler interface, and in the callback. The handler in the configuration.

  • Write your own Encoder to serialize the message, just implement the following interface. The default Encoder is kafka. Serializer. DefaultEncoder. \

    • interface Encoder<T> {

    • public Message toMessage(T data);

    • } \

  • Provides zooKeeper-based broker auto-awareness, which can be implemented with the zk.connect parameter. If Zookeeper is not used, you can use the broker.list parameter to specify a static list of brokers, so that messages are sent to a random broker, and if the selected broker fails, message delivery will fail.

  • Through the partition function of kafka. Producer. Partitioner class partition on the news. \

    • interface Partitioner<T> {

    • int partition(T key, int numPartitions);

    • } \

    The partition function takes two arguments: key and the number of partitions available, selects a partition from the partition list and returns an ID. The default partition policy is hash(key)%numPartitions. If the key is null, select one at random. The partitioning function can be customized with the parameter partitioner. Class.

  • \

The Consumer API has two levels. The lower level maintains a connection to a specified broker and closes the connection after receiving a message. This level is stateless, with offset for each message read. The high-level API hides the details of the connection to the brokers, communicating with the server without worrying about the server architecture. You can also maintain your own consumption state, and you can specify subscriptions to specific topics through criteria such as whitelisting or regular expressions. Low-level API\

  1. class SimpleConsumer {

    \

  2. /* Send a read request to a broker and get a message set */\

  3. public ByteBufferMessageSet fetch(FetchRequest request); \

  4. /* Send a read request to a broker and get a corresponding set */\

  5. public MultiFetchResponse multifetch(List<FetchRequest> fetches); \

  6. / * * /

  7. * Get offsets\ before the specified time

  8. The return value is a list of offsets, sorted in reverse order \

  9. * @param time: time, millisecond,\

  10. * If specified as OffsetRequest$.module $.latiest_time (), the latest offset.\ is obtained

  11. * If specified as OffsetRequest$.module $. Written est_time (), the oldest offset

  12. * / /

  13. publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets); \

  14. }

Copy the code

Low-level apis are the basis for high-level API implementations, as well as for scenarios where there is a special need to maintain consumption state, such as offline consumers such as Hadoop Consumer. High-level API\

  1. /* Create a connection */\

  2. ConsumerConnector connector = Consumer.create(consumerConfig); \

  3. interface ConsumerConnector {

    \

  4. / * * /

  5. * This method yields a list of streams, each of which is an iteration of MessageAndMetadata, through which messages and other metadata can be retrieved (current topic) \

  6. * Input: a map of <topic, #streams>\

  7. * Output: a map of <topic, list of message streams>\

  8. * / /

  9. public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); \

  10. / * * /

  11. * You can also get a list of streams containing iterations of TopicFiler compliant messages, \

  12. * A TopicFilter is a regular expression that encapsulates a whitelist or blacklist. \

  13. * / /

  14. public List<KafkaStream> createMessageStreamsByFilter(\

  15. TopicFilter topicFilter, int numStreams); \

  16. /* Submit the offset */\ currently consumed

  17. public commitOffsets()\

  18. /* Close the connection */\

  19. public shutdown()\

  20. }

Copy the code

The API is built around iterators implemented by KafkaStream. Each flow represents a series of messages convergent from one or more partitions and brokers. Each flow is processed by a single thread, so clients can specify how many flows they want at creation time with parameters. A flow is a combination of brokers in multiple partitions, but messages from each partition flow to only one flow. Each call to createMessageStreams registers the consumer to the Topic so that the load balance between the consumer and brokers is adjusted. The API encourages the creation of more Topic flows per call to reduce this adjustment. Registered createMessageStreamsByFilter method to monitor can sense new tipic conforms to the filter. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # 10, message and log messages from a fixed length of the head and variable length of the byte array. The header contains a version number and a CRC32 check code. \

  1. / * * /

  2. * A message with N bytes has the following format \

  3. * \

  4. * If the version number is 0\

  5. * \

  6. * 1.1-byte “magic” tag \

  7. * \

  8. * 2. 4-byte CRC32 check code \

  9. * \

  10. * 3. N – 5 bytes of specific information \

  11. * \

  12. * If the version number is 1\

  13. * \

  14. * 1.1-byte “magic” tag \

  15. * \

  16. * the 2.1-byte argument allows you to annotate some additional information such as whether it is compressed, decoding type, etc. \

  17. * \

  18. * 3.4 bytes of CRC32 check code \

  19. * \

  20. * 4. N – 6 bytes of specific information \

  21. * \

  22. * /

Copy the code

Log a two-partitioned topic called “my_topic” has two log folders, my_topIC_0 and my_topIC_1. Each folder contains a specific data file, and each data file is a series of log entities. Each logging entity has a 4-byte integer N to indicate the length of the message, followed by n-byte messages. Each message can be marked with a 64-bit integer offset, which marks the starting position of the message in the message flow sent to the partition. The name of each log file is offset from the first log in the file. Therefore, the name of the first log file is 00000000000.kafka. So the difference between the two adjacent file names is a number S, which is roughly the maximum size of the log file specified in the configuration file. Message formats are maintained by a single interface, so messages can flow seamlessly between producers, brokers, and consumers. The message format stored on the hard disk is as follows: \

  • Message length: 4 bytes (value: 1+4+n)
  • Version: 1 Byte
  • CRC verification code: 4 bytes
  • Specific message: n bytes\




 





Write operation messages are continuously appended to the end of the last log, and a new file is created when the log size reaches a specified value. For write operations, there are two parameters: one specifies that data must be flushed to disk when the number of messages reaches this value, and the other specifies the interval at which data must be flushed to disk. This ensures the persistence of data, and only a certain number of messages or a period of time will be lost in the event of a system crash.



A read operation

The read operation requires two parameters: a 64-bit offset and an s-byte maximum read. S is usually larger than the size of a single message, but it can be smaller than the size of a single message in some cases where the individual messages are large. In this case, the read operation is constantly retried, doubling the read amount with each retry until a complete message is read. The maximum value for a single message can be configured so that the server rejects messages larger than this value. You can also specify a maximum number of read attempts to the client to avoid an infinite number of retries in order to read a complete message.

In the actual reading operation, it is necessary to locate the log file where the data resides first, then calculate the offset in the log according to the offset(the previous offset is the offset of the entire partition), and then read the data at the offset position. The location operation is done by binary lookup, and Kafka maintains a range of offsets in memory for each file.



Here is the format of the result sent to the consumer:

\

  1. MessageSetSend (fetch result)\

  2. \

  3. total length     : 4 bytes\

  4. error code       : 2 bytes\

  5. message 1        : x bytes\

  6. . \

  7. message n        : x bytes\

  8. MultiMessageSetSend (multiFetch result)\

  9. \

  10. total length       : 4 bytes\

  11. error code         : 2 bytes\

  12. messageSetSend 1\

  13. . \

  14. messageSetSend n

Copy the code

The Delete log Manager allows you to customize the delete policy. The current policy is to delete logs whose modification time is N days ago (by time), or another policy can be used: the policy that saves the last N GB of data (by size). In order to avoid blocking read operations during deletion, the copy-on-write implementation is adopted. The binary lookup function of read operations is actually performed on a static snapshot copy during deletion, which is similar to Java’s CopyOnWriteArrayList. Reliability guarantee log files have a configurable parameter M. Messages cached above this number will be forcibly flushed to disk. A log rectification thread loops through the latest log file to verify that each message is valid. The valid criteria are: the size of all files and the maximum offset are smaller than the size of the log file, and the CRC32 check code of the message is the same as the check code stored in the message entity. If an invalid message is found at one offset, the content between that offset and the next valid offset is removed. There are two cases that must be considered: 1. Some blocks cannot be written when a crash occurs. 2, some blank data blocks are written. The reason for the second case is that the operating system has an inode for each file (an inode is a data structure among many “Unix-like file systems”). Each inode holds one file system object in the file system, including files, directories, sizes, device files, sockets, pipes, etc.), but there is no guarantee that the inode will be updated and the data will be written in the order in which the data is written. When the inode’s size information is updated, a crash occurs when the data is written, resulting in a blank data block. CRC checks these blocks and removes them, of course, losing blocks that were not written due to crashes. \