Kafka cluster environment construction, message storage mechanism details

Cicada a smile

Kafka cluster environment

1. Environment version

Version: Kafka2.11, Zookeeper3.4Copy the code

Note: zookeeper3.4 is also deployed in cluster mode.

2, decompress rename

Tar -zxvf kafka_2.11-0.11.0.tgz mv kafka_2.11-0.11.0.0 kafka2.11Copy the code

Creating a Log Directory

[root @ en - master kafka2.11] # mkdir logsCopy the code

Note: The previous operations need to be synchronized to other services in the cluster.

3. Add environment variables

Vim /etc/profile export KAFKA_HOME=/opt/kafka2.11export PATH=$PATH:$KAFKA_HOME/bin source /etc/profileCopy the code

4. Modify the core configuration

[root@en-master /opt/kafka2.11/config]# vim server.properties Log. Dirs =/opt/kafka2.11/logs # Zk zookeeper cluster. The connect = zk01:2181, zk02:2181, zk03:2181Copy the code

Note: Broker. Id specifies the number of services installed in a cluster.

5. Start kafka cluster

[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties # stop [root@node02 Kafka2.11]# bin/kafka-server-stop.sh # process check [root@node02 kafka2.11]# JPSCopy the code

Note: The ZooKeeper cluster service is started by default, and Kafka under the cluster is started separately.

6. Basic management commands

Create a topic

bin/kafka-topics.sh --zookeeper zk01:2181 \
--create --replication-factor 3 --partitions 1 --topic one-topic
Copy the code

Parameter Description:

  • Replication-factor defines the number of replicas
  • Partitions define the number of partitions
  • Topic: Defines the topic name

View the list of topics

bin/kafka-topics.sh --zookeeper zk01:2181 --list
Copy the code

Modify topic Partitioning

bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5
Copy the code

See the topic

bin/kafka-topics.sh --zookeeper zk01:2181 \
--describe --topic one-topic
Copy the code

Send a message

Bin /kafka-console-producer.sh \ --broker-list 192.168.72.133:9092 --topic one-topicCopy the code

News consumption

Bin /kafka-console-consumer.sh \ --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topicCopy the code

Delete the topic

bin/kafka-topics.sh --zookeeper zk01:2181 \
--delete --topic first
Copy the code

7, Zk cluster use

A Single broker in a Kafka cluster is elected as a Controller. The Controller relies on the Zookeeper environment and manages the up-down of the broker in the cluster, the allocation of partition copies for all topics, and the election of the leader.

Two, message interception cases

1. Introduction to interceptors

The Producer interceptor of Kafka middleware is mainly used to implement the custom control logic of sending messages. Before the message is sent and the callback logic is executed, the user has the opportunity to customize the message, such as message modification, sending status monitoring, and so on. The user can specify multiple interceptors to intercept the message in sequence.

Core method

  • Configure: called when getting configuration information and initialization data;
  • OnSend: This method is called before the message is serialized and the partition is evaluated.
  • OnAcknowledgement: called after a message has been sent to the Broker, or when the sending process has failed;
  • Close: Closes the interceptor call and performs some resource cleanup;

Note: The interceptors here are for the message sending process.

2. Custom interception

Definition: Implement the ProducerInterceptor interface.

Interceptor 1: In the onSend method, the intercepted message is modified.

@Component public class SendStartInterceptor implements ProducerInterceptor<String, String> { private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor"); @Override public void configure(Map<String, ? > configs) { LOGGER.info("configs..." ); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, Return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), Record.key (), "onSend: {" + record.value()+"}"); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { LOGGER.info("onAcknowledgement..." ); } @Override public void close() { LOGGER.info("SendStart close..." ); }}Copy the code

Interceptor two: In the onAcknowledgement method, determine whether the message was sent successfully.

@Component public class SendOverInterceptor implements ProducerInterceptor<String, String> { private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor"); @Override public void configure(Map<String, ? > configs) { LOGGER.info("configs..." ); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { LOGGER.info("record... {}", record.value()); return record ; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception ! = null){ LOGGER.info("Send Fail... exe-msg",exception.getMessage()); } LOGGER.info("Send success..." ); } @Override public void close() { LOGGER.info("SendOver close..." ); }}Copy the code

Load interceptors: Add interceptors based on a KafkaProducer configuration Bean.

@Configuration public class KafkaConfig { @Bean public Producer producer (){ Properties props = new Properties(); // Omit other configurations... List<String> interceptors = new ArrayList<>(); interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor"); interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); return new KafkaProducer<>(props) ; }}Copy the code

3. Code cases

@RestController
public class SendMsgWeb {
    @Resource
    private KafkaProducer<String,String> producer ;
    @GetMapping("/sendMsg")
    public String sendMsg (){
        producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));
        return "success" ;
    }}
Copy the code

Based on the above custom Bean type, the message is sent, paying attention to the log information printed in the interceptor.

Kafka storage analysis

Note: This process is based on the source code execution process tracked by the producer.send method in the above case. The process in the source code is relatively clear, and the core process involved is as follows.

1. Message generation process

 

Producer sends messages asynchronously. The process of sending messages is as follows:

  • After the Producer sends the message, it goes through the interceptor, serialization, and transaction judgment.
  • After the process is executed, the message content is put into the container;
  • If the container is full within the specified time (size), it wakes up the Sender thread;
  • The container also performs a Sender thread wake up if it is not full within the specified time;
  • After waking up the Sender thread, pull the container data into the topic;

Ramble: read the source code of these middleware, not only can open your mind, but also let yourself realize that the usual code may really be called moving bricks.

2. Storage mechanism

In Kafka messages are classified by topic. Producers produce messages to topics. Topic partitions are physically stored based on message log files.

 

  • Each partition corresponds to a log file, and messages are continuously appended to the end of the log file.
  • The log file stores message data produced by producer, which adopts sharding and indexing mechanisms.
  • Partition is divided into multiple segments. Each segment corresponds to two (.index) and (.log) files;
  • Index Indicates the index information stored by the file type.
  • Log files store data for messages;
  • The metadata in the index file points to the physical offset address of Message in the corresponding data file.
  • Each consumer in the consumer group will record the message offset position of consumption in real time.
  • Of course, in the event of a message consumption error, recovery continues from the last recorded location;

3. Transaction control mechanism

 

Kafka supports transaction control of messages

Producer transaction

The principle of transactionality across partitions and sessions is introduced, and the globally unique TransactionID is bound to the PID obtained by Producer and TransactionID. After being restarted, the Producer can obtain the original PID using the ongoing TransactionID. Kafka manages transactions based on the TransactionCoordinator component. A Producer interacts with a TransactionCoordinator to obtain the task status corresponding to the TransactionID. A TransactionCoordinator writes the transaction state to Kafka’s internal Topic. If the entire service is restarted, the ongoing transaction state can be restored.

Consumer affairs

Consumer message consumption, the guarantee strength of the transaction is low, and there is no guarantee that the message will be consumed precisely because the message of the same transaction may have been deleted after the restart.