Introduction and installation of Kafka
Apache Kafka® is a distributed streaming platform that is a publish-subscribe system
What scenarios does Kafka fit into? Construct a real-time streaming data pipeline that reliably captures data across systems or applications. Message Queue builds real-time streaming applications that transform or influence the streaming data. (That is, stream processing, which changes internally between Kafka Stream topics)
A common scenario is to input microservice logs into Kafka, reduce system latency through Kafka’s caching feature, and then asynchronously consume them into ELK.
Each kafka record contains a key, a value, and a timestamp. Kafka has four core apis: The Producer API allows an application to publish a stream of data to one or more Kafka topics. The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them. The Streams API allows an application to act as a stream processor, consuming input Streams generated by one or more topics, and then producing an output stream to one or more topics, effectively converting between The input and output Streams. The Connector API allows you to build and run reusable producers or consumers that connect Kafka Topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.
Topics Topics are data Topics where data records are published and can be used to differentiate business systems. Topics in Kafka are always multi-subscriber, a topic can have one or more consumers subscribe to its data.
The Kafka cluster keeps a record of all publications – whether they have been consumed or not – controlled by a configurable parameter, the retention period. For example, if the retention policy is set to two days, a record can be consumed at any time within two days of publication, after which the record is discarded and disk space is freed. Kafka’s performance is independent of data size. The Kafka consumer is very cheap – the increase and decrease of the consumer has little impact on the cluster or other consumers.
Distributed logs are partitioned on servers in a Kafka cluster. Each server shares these partitions as it processes data and requests. Each partition is backed up on the configured server to ensure fault tolerance.
Producer The producer can publish the data to a topic of its choice. The producer is responsible for which partition of the topic records are assigned to. Load balancing can be done simply using a loop, or it can be done according to some semantic partitioning function (for example, key in a record).
Consumers Consumers are identified by a consumer group name, and each record published to a topic is assigned to a consumer instance in a subscribing consumer group. Consumer instances can be distributed across multiple processes or on multiple machines. If all consumer instances are in the same consumer group, message logging is load balanced to each consumer instance. If all consumer instances are in different consumer groups, each message record is broadcast to all
The advantage of Kafka as a messaging system queue is that it allows you to divide the process of processing data across multiple consumer instances, allowing you to extend the process. Unfortunately, queues are not in multi-subscriber mode – once a process reads the data, it is discarded. A publis-subscribe system allows you to broadcast data to multiple processes, but it doesn’t extend processing because each message is sent to all subscribers.
Kafka has a strict order guarantee compared to RabbitMq queues. Traditional queues hold ordered records on the server, and if multiple consumers consume the data in the queue, the server outputs the records in the order they are stored. Although the server outputs the records sequentially, the records are passed to the consumers asynchronously, so the records may arrive at different consumers out of order. This means that in the case of parallel consumption, the order of records is lost. Thus messaging systems often use the concept of a “unique consumer,” where only one process consumes from the queue, but this means that data cannot be processed in parallel.
For example, the inventory change of skU1 has two records, msG1 inventory 1 and MSG2 inventory 2. When there are multiple consumers, although MSG1 is sent first and MSG2 is sent later, in the concurrent condition, the msG2 consumer may complete the processing first and the MSG1 consumer is still consuming, resulting in the loss of time series.
A Partition in a Kafka topic is a parallel concept. Kafka provides order assurance and load balancing for a pool of consumers by assigning partitions in a topic to consumers in a consumer group so that each partition is consumed by one consumer in the consumer group. By doing so, we can ensure that the consumer is the only reader of the partition and consumes the data in order. Numerous partitions ensure load balancing across multiple consumer instances. Note, however, that the number of consumer instances in a consumer group cannot exceed the number of partitions. Kafka can partition by key in the record.
Download and Install
Download address kafka.apache.org/downloads Binary downloads: Scala 2.12 – kafka_2. 12-2.8.0. TGZ (asc, sha512)
Decompress the tar−xzfkafka2.13−2.8.0.tgztar -xzf kafka_2.13-2.8.0.tgztar−xzfkafka2.13−2.8.0. TGZ CD kafka_2.13-2.8.0
Start the zookeeper new version will no longer rely on external zookeeper $bin/zookeeper – server – start. Sh config/zookeeper. Properties
Run the kafka $bin/kafka-server-start.sh config/server.properties command
Started successfully (kafka.server.kafkaserver)
[2021-07-30 21:46:39,446] INFO Kafka version: 2.8.0 (org.apache.kafka.com mon. Utils. AppInfoParser) [21:46:39 2021-07-30, 446] INFO Kafka commitId: Ebb1d6e21cc92130 (org.apache.kafka.com mon. Utils. AppInfoParser) [21:46:39 2021-07-30, 447] INFO Kafka startTimeMs: 1627652799443 (org.apache.kafka.com mon. Utils. AppInfoParser) [21:46:39 2021-07-30, 448] INFO [KafkaServer id = 0] started (kafka.server.kafkaserver) [2021-07-30 21:46:39,495] INFO [broker-0-to-controller-send-thread]: Recorded New Controller, from now on will use broker 172.20.25.8:9092 (ID: 0 rack: Recorded null) (kafka.server.BrokerToControllerRequestThread)Copy the code
Sh –create –topic quickstart-events –bootstrap-server localhost:9092
Created topic quickstart-events.
Check the topic bin/kafka-topics. Sh –list –zookeeper localhost:2181
$./bin/kafka-console-producer.sh –topic quickstart-events –bootstrap-server localhost:9092
Sending a Hello message
$bin/kafka-console-consumer.sh –topic quickstart-events — From-beginning –bootstrap-server localhost:9092
$rm -rf/TMP /kafka-logs/TMP /zookeeper
The cluster
Create three nodes at this level to replicate configuration files
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
Edit these new files and set the following properties: config/server-1.properties: broker. Id =1 Listeners =PLAINTEXT://:9093 log.dir=/ TMP /kafka-logs-1
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
The broker. Id attribute is the name of each node in the cluster, which is unique and permanent. We had to rewrite the port and log directory because we were running these on the same machine, and we didn’t want all agents trying to register on the same port, or overwriting each other’s data.
Start two new nodes:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
Copy the code
Sh –create –zookeeper localhost:2181 –replication-factor 3 — Partitions 1 –topic my-replicated-topic
Sh –describe — Zookeeper localhost:2181 –topic my-replicated- Topic topic: replicated my-replicated-topic TopicId: U842ZMiFQAyZgdiUSJ5ICA PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: My-replicated – Topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
The first line gives a summary of all partitions, and each of the following lines gives information about one partition. Because we only have one partition, we only have one row. The “leader” is the node responsible for all read and write operations for a given partition. Each node is the leader of a randomly selected portion of the partition. Replicas replicates the node list of the partition log, whether the node is the leader or just alive. Isr is a set of synchronous Replicas, a subset of the Replicas list that lives and is referred to the leader.
Sh –describe –zookeeper localhost:2181 –topic quickstart-events topic: quickstart-events TopicId: YmOLaKbBTxitS9AgDfs8LQ PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
The original theme has no copy and is on server 0. When we created the cluster, this was the only server.
When a Kafka service fails, the service will still work.