Art is long, life is long
Kafka profile
Kafka is a distributed, multi-partitioned, multi-replica message flow platform based on Zookeeper. Kafka is also an open source, publish-subscribe message engine system.
Basic terms for Kafka
-
Messages: Units of data in Kafka are called messages, also known as records, and can be thought of as records for a row in a database table.
-
Batch: To improve efficiency, messages are written to Kafka in batches. Batch refers to a group of messages.
-
Topic: The type of message is called a Topic, and you can say that a Topic represents a class of messages. It’s like classifying messages. Topics are like tables in a database.
-
Partitions: A topic can be divided into partitions. Partitions within a topic can be deployed on multiple machines, not on the same machine, to achieve Kafka’s scalability. Partitions within a single topic are ordered, but there is no guarantee that all partitions within a topic are ordered
-
Producers: Client applications that publish messages to a topic are called producers. Producers are used to continuously send messages to a topic.
-
Consumer: The client program that subscribes to topic messages is called a Consumer, and the Consumer is used to process the producer-generated messages.
-
Consumer Group: The relationship between producers and consumers is the same as that between chefs and customers in restaurants. A chef corresponds to multiple customers, that is, a producer corresponds to multiple consumers. A Consumer Group refers to a Group composed of one or more consumers.
-
Offset: A Consumer Offset is metadata that is an increasing integer value that records the position of the Consumer at the time of rebalancing so that it can be used to recover data.
Broker: A separate Kafka server is called the Broker. The Broker receives messages from producers, sets offsets for messages, and submits them to disk for saving.
Broker cluster: Brokers are part of a cluster that consists of one or more brokers, each of which has one broker acting as a cluster controller (automatically elected from the active members of the cluster).
-
Replicas: Copies of messages in Kafka are also called replicas. The number of replicas is configurable. Kafka defines two types of replicas: Leader Replica and Follower Replica. The former provides services while the latter passively follows.
-
Rebalance. The process by which other consumer instances automatically reassign subscribed topic partitions when a consumer instance in a consumer group fails. Rebalance is an important way to achieve high availability on the Kafka consumer side.
Kafka features (Design principles)
High throughput, low latency
Kafka can process hundreds of thousands of messages per second with a minimum latency of just a few milliseconds.High scalability
: Each topic contains multiple partitions, which can be distributed among different brokers.Durability, reliability
: Kafka allows persistent storage of data, messages are persisted to disk, and supports data backup to prevent data loss. Kafka’s underlying data store is based on Zookeeper. Zookeeper is known for its persistent data storage.Fault tolerance
Kafka allows nodes in a cluster to fail, a node to go down, and the Kafka cluster to work properlyHigh concurrency
: Supports simultaneous read and write operations by thousands of clients
Usage scenarios for Kafka
- Activity tracking: Kafka can be used to track user behavior, for example, we often go back to the taobao shopping, the moment you open taobao, your login information, log in as a message transmission to Kafka, when you browse shopping, your browsing information, your search index, your shopping hobbies will be as one message to Kafka, So you can generate reports, you can do smart recommendations, buy preferences, etc.
- Message delivery: Another basic use of Kafka is message delivery. Applications send notifications to users by message delivery. These application components can generate messages regardless of the format of the message or how the message is sent.
- Metrics: Kafka is also used to record operational monitoring data. This includes collecting data for various distributed applications and producing centralized feedback for various operations, such as alarms and reports.
- Logging: The basic concept of Kafka comes from commit logging. For example, we can send database updates to Kafka to record the update time of the database. Kafka provides a unified interface service for consumers such as Hadoop, Hbase, and Solr.
- Streaming processing: Streaming processing is an area that offers a variety of applications.
- Load limiting: Kafka is mainly used in the Internet domain when there are too many requests at one time. Kafka can write requests to Kafka to avoid direct requests to the back-end application, which may cause service crashes.
Kafka’s message queue
- Kafka supports consumer groups, which means that there are one or more consumers in Kafka. If a message produced by a producer is consumed by a single consumer, then the pattern is point-to-point
- If messages produced by one producer or more producers can be consumed by multiple consumers at the same time, such a message queue is a publish-subscribe message queue
Kafka system architecture
As you can see in the figure above, a typical Kafka cluster consists of producers (Page views generated at the front of the Web, server logs, system CPUS, Memory, etc.), several brokers (Kafka supports horizontal scaling. Higher cluster throughput), several Consumer groups, and a Zookeeper cluster. Kafka uses Zookeeper to manage cluster configuration, elect the leader, and rebalance when the Consumer Group changes. Producer uses push mode to publish messages to the broker, and Consumer uses pull mode to subscribe to and consume messages from the broker.
Core API
Kafka has four core apis:
- The Producer API, which allows an application to send message records to one or more topics
- The Consumer API, which allows an application to subscribe to one or more topics and process the record stream generated for it
- The Streams API, which allows an application to act as a stream processor, consuming input Streams from one or more topics and generating output Streams for them, effectively transforming input Streams into output Streams.
- Connector API, which allows you to build and run available producers and consumers that connect Kafka topics to existing applications or data systems. For example, a connector for a relational database might capture all changes to a table
Springboot integration kafka
Local Kafka and ZK environment setup
We need to start a standalone kafka and ZooKeeper environment locally. Kafka comes with ZooKeeper, which you can start directly.
My local environment configuration is as follows:
- Win10 system
- Kafka_2. 12-2.5.0
- Zookeeper – 3.4.12
- Spring boot 2.2.10. RELEASE
Zookeeper installation will not be described, directly start zK, port 2181
To start Kafka, first modify the server.properties file in the config file
Listeners = PLAINTEXT: / / 127.0.0.1:9092
Log. dirs=/kafka-logs Folder where logs are stored
Go to the kafka root directory, type.\bin\ Windows \kafka-server-start.bat.\config\server.properties, and press Enter. A successful window is displayed.
Rely on:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Copy the code
application.yml
server:
port: 8999
contextPath : /kafka
spring:
application:
name: kafka
kafka:
bootstrapServers: 127.0. 01.:9092
consumer:
groupId: myGroup
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
groupId: myGroup
keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
valueDserializer: org.apache.kafka.common.serialization.StringSerializer
Copy the code
producers
package com.cn.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
/* the message producer is used to send messages */
@Service
public class MessageProducer {
private final Logger log = LoggerFactory.getLogger(MessageProducer.class);
@Autowired
KafkaTemplate kafkaTemplate;
public void send(String payMessage) {
kafkaTemplate.send("payTopic", payMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.info("Failed to send message:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Message sent successfully:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-"+ result.getRecordMetadata().offset()); }}); }}Copy the code
consumers
package com.cn.consumer;
import com.cn.constant.TopicConst;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
* Message consumers are used to process messages */
@Service
public class MessageConsumer {
private final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@KafkaListener(topics = "payTopic")
public void onMessage(ConsumerRecord
record) {
logger.info("The consumer receives the message :"+record.topic()+"-"+record.partition()+"-"+record.value()); }}Copy the code
- Simply configure a listener with annotations on the listening method and specify the topic to listen on.
- The recipient of a Kafka message is returned as a ConsumerRecord object, whose value property is the actual message.
Send a message
package com.cn.controller;
import com.cn.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendController {
@Autowired
private MessageProducer producer;
@GetMapping(value = "/sendMsg/{message}")
public void sendMsg(@PathVariable("message") String msg){ producer.send(msg); }}Copy the code
test
The console contents are as follows: