Kafka cluster environment construction, message storage mechanism details
Cicada a smile
Kafka cluster environment
1. Environment version
Version: Kafka2.11, Zookeeper3.4Copy the code
Note: zookeeper3.4 is also deployed in cluster mode.
2, decompress rename
Tar -zxvf kafka_2.11-0.11.0.tgz mv kafka_2.11-0.11.0.0 kafka2.11Copy the code
Creating a Log Directory
[root @ en - master kafka2.11] # mkdir logsCopy the code
Note: The previous operations need to be synchronized to other services in the cluster.
3. Add environment variables
Vim /etc/profile export KAFKA_HOME=/opt/kafka2.11export PATH=$PATH:$KAFKA_HOME/bin source /etc/profileCopy the code
4. Modify the core configuration
[root@en-master /opt/kafka2.11/config]# vim server.properties Log. Dirs =/opt/kafka2.11/logs # Zk zookeeper cluster. The connect = zk01:2181, zk02:2181, zk03:2181Copy the code
Note: Broker. Id specifies the number of services installed in a cluster.
5. Start kafka cluster
[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties # stop [root@node02 Kafka2.11]# bin/kafka-server-stop.sh # process check [root@node02 kafka2.11]# JPSCopy the code
Note: The ZooKeeper cluster service is started by default, and Kafka under the cluster is started separately.
6. Basic management commands
Create a topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--create --replication-factor 3 --partitions 1 --topic one-topic
Copy the code
Parameter Description:
- Replication-factor defines the number of replicas
- Partitions define the number of partitions
- Topic: Defines the topic name
View the list of topics
bin/kafka-topics.sh --zookeeper zk01:2181 --list
Copy the code
Modify topic Partitioning
bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5
Copy the code
See the topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--describe --topic one-topic
Copy the code
Send a message
Bin /kafka-console-producer.sh \ --broker-list 192.168.72.133:9092 --topic one-topicCopy the code
News consumption
Bin /kafka-console-consumer.sh \ --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topicCopy the code
Delete the topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--delete --topic first
Copy the code
7, Zk cluster use
A Single broker in a Kafka cluster is elected as a Controller. The Controller relies on the Zookeeper environment and manages the up-down of the broker in the cluster, the allocation of partition copies for all topics, and the election of the leader.
Two, message interception cases
1. Introduction to interceptors
The Producer interceptor of Kafka middleware is mainly used to implement the custom control logic of sending messages. Before the message is sent and the callback logic is executed, the user has the opportunity to customize the message, such as message modification, sending status monitoring, and so on. The user can specify multiple interceptors to intercept the message in sequence.
Core method
- Configure: called when getting configuration information and initialization data;
- OnSend: This method is called before the message is serialized and the partition is evaluated.
- OnAcknowledgement: called after a message has been sent to the Broker, or when the sending process has failed;
- Close: Closes the interceptor call and performs some resource cleanup;
Note: The interceptors here are for the message sending process.
2. Custom interception
Definition: Implement the ProducerInterceptor interface.
Interceptor 1: In the onSend method, the intercepted message is modified.
@Component public class SendStartInterceptor implements ProducerInterceptor<String, String> { private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor"); @Override public void configure(Map<String, ? > configs) { LOGGER.info("configs..." ); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, Return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), Record.key (), "onSend: {" + record.value()+"}"); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { LOGGER.info("onAcknowledgement..." ); } @Override public void close() { LOGGER.info("SendStart close..." ); }}Copy the code
Interceptor two: In the onAcknowledgement method, determine whether the message was sent successfully.
@Component public class SendOverInterceptor implements ProducerInterceptor<String, String> { private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor"); @Override public void configure(Map<String, ? > configs) { LOGGER.info("configs..." ); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { LOGGER.info("record... {}", record.value()); return record ; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception ! = null){ LOGGER.info("Send Fail... exe-msg",exception.getMessage()); } LOGGER.info("Send success..." ); } @Override public void close() { LOGGER.info("SendOver close..." ); }}Copy the code
Load interceptors: Add interceptors based on a KafkaProducer configuration Bean.
@Configuration public class KafkaConfig { @Bean public Producer producer (){ Properties props = new Properties(); // Omit other configurations... List<String> interceptors = new ArrayList<>(); interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor"); interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); return new KafkaProducer<>(props) ; }}Copy the code
3. Code cases
@RestController
public class SendMsgWeb {
@Resource
private KafkaProducer<String,String> producer ;
@GetMapping("/sendMsg")
public String sendMsg (){
producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));
return "success" ;
}}
Copy the code
Based on the above custom Bean type, the message is sent, paying attention to the log information printed in the interceptor.
Kafka storage analysis
Note: This process is based on the source code execution process tracked by the producer.send method in the above case. The process in the source code is relatively clear, and the core process involved is as follows.
1. Message generation process
Producer sends messages asynchronously. The process of sending messages is as follows:
- After the Producer sends the message, it goes through the interceptor, serialization, and transaction judgment.
- After the process is executed, the message content is put into the container;
- If the container is full within the specified time (size), it wakes up the Sender thread;
- The container also performs a Sender thread wake up if it is not full within the specified time;
- After waking up the Sender thread, pull the container data into the topic;
Ramble: read the source code of these middleware, not only can open your mind, but also let yourself realize that the usual code may really be called moving bricks.
2. Storage mechanism
In Kafka messages are classified by topic. Producers produce messages to topics. Topic partitions are physically stored based on message log files.
- Each partition corresponds to a log file, and messages are continuously appended to the end of the log file.
- The log file stores message data produced by producer, which adopts sharding and indexing mechanisms.
- Partition is divided into multiple segments. Each segment corresponds to two (.index) and (.log) files;
- Index Indicates the index information stored by the file type.
- Log files store data for messages;
- The metadata in the index file points to the physical offset address of Message in the corresponding data file.
- Each consumer in the consumer group will record the message offset position of consumption in real time.
- Of course, in the event of a message consumption error, recovery continues from the last recorded location;
3. Transaction control mechanism
Kafka supports transaction control of messages
Producer transaction
The principle of transactionality across partitions and sessions is introduced, and the globally unique TransactionID is bound to the PID obtained by Producer and TransactionID. After being restarted, the Producer can obtain the original PID using the ongoing TransactionID. Kafka manages transactions based on the TransactionCoordinator component. A Producer interacts with a TransactionCoordinator to obtain the task status corresponding to the TransactionID. A TransactionCoordinator writes the transaction state to Kafka’s internal Topic. If the entire service is restarted, the ongoing transaction state can be restored.
Consumer affairs
Consumer message consumption, the guarantee strength of the transaction is low, and there is no guarantee that the message will be consumed precisely because the message of the same transaction may have been deleted after the restart.