Kafka is a distributed, partitioned, replicable, publish/subscribe based messaging system. Kafka is primarily used in the big data space, but also in distributed systems. RocketMQ, a popular message queue, was developed in Java using Kafka’s principles.
Kafka is suitable for both offline and online message consumption, with messages stored on disk.
Kafka generalizes messages on a Topic basis. Producers send (Push) messages to topics, and Consumers consume (Pull) messages that have subscribed to topics.
The basic concept
The basic concepts of message queues are particularly important, and once you have a deep understanding of the basic concepts, the principles of message queues and common problems become more obvious.
-
Broker: A single Kafka server is a Broker. The main job of the Broker is to receive messages sent by producers, allocate offsets, and save the wrapped data to disk. In addition, the Broker receives requests from consumers and other brokers, processes them based on the type of request and returns a response. Multiple brokers can be formed into a Cluster to provide services. One Broker in each Cluster is selected as the Controller. The Controller is the command center of the Kafka Cluster The function. The Controller is responsible for managing the status of partitions, managing the status of copies of each partition, and listening for data changes in ZooKeeper. Controller is also a master and slave implementation. All brokers monitor the status of the Controller Leader. When the Leader Controller fails, a new Controller Leader is elected.
-
Messages: Messages are the most basic message unit in Kafka. A message consists of a string of bytes, mainly composed of keys and values, which are byte arrays. The key’s main function is to route the message to the specified partition according to a certain policy. This ensures that all messages containing the same key are written to the same partition
-
Topic: A Topic is a logical concept for storing messages. A Topic can be thought of as a collection of messages. Each Topic can have multiple producers pushing messages into it and multiple consumers pulling messages into it.
-
Partitions: Each Topic can be divided into multiple partitions (each Topic has at least one partition), which are allocated to different brokers to scale Kafka horizontally to increase Kafka’s parallel processing capabilities. Different partitions under the same Topic contain different messages. When a message is added to a partition, it is assigned an offset, which is the unique number of the message in the partition. In addition,Kafka uses offset to ensure the order of the message in the partition. The order of the offset does not cross partitions It might not be orderly. Partitions concept map
-
Log: The partition logically corresponds to a Log. When a producer writes a message to the partition, it is actually written to a Log. Log is a logical concept that corresponds to a folder on a disk. Log consists of multiple segments, and each Segment corresponds to a Log file and an index file.
-
Duplicates :Kafka makes redundant copies of messages. Each partition can have multiple duplicates, each containing the same message (but not guaranteed to be identical at the same time). Copies are classified into Leader and Follower types. If a partition has only one copy, the copy belongs to the Leader and has no followers. Kafka replicas have synchronization mechanisms. In each replica set, one replica is elected as the Leader replica. Kafka uses different election strategies in different scenarios. All read and write requests in Kafka are processed by the elected Leader copy, and the rest are processed by the Follower copy. The Follower copy simply pulls data from the Leader copy and updates it to its own Log.
Partition copy:
-
Producers: Producers produce messages and push them to a Topic partition according to certain rules
-
Consumers: Consumers primarily pull and consume messages from topics. Consumer maintains information about where the Consumer is consuming to the Partition (the value of offset). ** In Kafka, multiple consumers can form a Consumer Group, and a Consumer can only belong to one Consumer Group. The Consumer Group guarantees that each partition of the Topic to which it subscribs will be assigned to only one Consumer in the Consumer Group, so if broadcast consumption of messages is required, it can be achieved by placing consumers in multiple Consumer groups. Rebalance Kafka by dynamically adding consumers to the Consumer Group, you can scale horizontally using this Rebalance operation.
-
ISR set: An ISR set represents a subset of the replica set that is currently available (alive) and has a message volume similar to that of the Leader. The nodes of the replica in the ISR collection are all connected to the ZK, and the difference between the offset of the last message of the replica and the offset of the last message of the Leader replica cannot exceed the specified threshold. The Leader copy of each partition maintains the ISR collection for that partition. As mentioned above, the Leader copy makes a message write request, and the Follower copy pulls the written message from the Leader. In the second process, there will be a state that the number of messages in the Follower copy is less than that in the Leader copy. As long as the difference is less than the specified threshold, the replica set at this time is the ISR set.
The basic use
Start the Kafka
Here’s how to install single-instance Kafka:
- Official website to download
Kafka_2. 11-1.0.0. TGZ
And then unzip it - Start Kafka’s zK service,
./bin/windows/zookeeper-server-start.bat ./conf/zookeeper.properties
- Start the Kafka server,
./bin/windows/kafka-server-start.bat ./conf/server.properties
The Kafka command line is used. For Windows, run the Kafka command using the corresponding. Sh file in Linux.
- Create a topic named Demo with the number of partitions to 1 and the number of replica factories to 1.
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
- List all topics:
kafka-topics.bat --list --zookeeper localhost:2181
- To send a message to the specified topic, first go to the command terminal:
kafka-console-producer.bat --broker-list localhost:9092 --topic demo
And then type the message at the command terminalHello World!
- Specifies that messages are consumed from the head of the message queue:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic demo --from-beginning
Java call API using Kafka:
/** * @name: ProducerDemo * @description: Kafka server Push message * @author: BeautifulSoup * @date: */ public class ProducerDemo {public static void main(String[] args) {// Create Kafka configuration item Properties properties=new Properties(); Properties. Put ("bootstrap.servers", "localhost:9092"); // Define the client ID properties.put("client.id", "DemoProducer"); / / define the message key and the value of the data type is a byte array properties. The put (" key. The serializer ", "org.apache.kafka.com mon. Serialization. IntegerSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer =new KafkaProducer<>(properties); String topic = "demo"; Int messageNo=1; While (true){// Define the value of the message String messageStr="Message_"+messageNo; long startTime=System.currentTimeMillis(); Send (new ProducerRecord<>(topic, MessageNo,messageStr,new Callback() // Metadata saves the metadata of the message sent by the producer. If an exception occurs during the sending of the message, change the parameter value to null@override public void onCompletion(RecordMetadata Metadata, Exception exception) { long elapsedTime=System.currentTimeMillis()-startTime; if(null! =metadata){system.out.println (" message sent to partition :"+metadata.partition()+" +elapsedTime+"ms"); }else{ exception.printStackTrace(); }}})); }} /** * @name: ConsumerDemo * @description: Kafka client for message Pull * @author: BeautifulSoup * @date: Public class ConsumerDemo {public static void main(String[] args) {Properties Properties =new Properties(); properties.put("bootstrap.servers","localhost:9092"); // Specify the id properties of the Consumer Group. Put (" Group. Id ", "BeautifulSoup"); // Automatically submit offset properties. Put (" enable.auto.mit ", "true"); Properties. Put (" auto.mit.interval. ms","1000"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer=new KafkaConsumer<>(properties); // Topic consumer.subscribe(Arrays. AsList ("demo","test")); ConsumerRecords<String, String> records=consumer.poll(100); for (ConsumerRecord<String,String> consumerRecord : records) { System.out.println(" consumerRecord.offset()+", "consumerRecord.key()+"," consumerRecord.value() "); }}finally{// Close consumer consumer.close(); }}}Copy the code