preface

This series is about distributed messaging middleware

Basic concepts about distributed systems, what middleware is, what message-oriented middleware can do, what distributed message-oriented middleware looks like and so on were discussed in the previous article distributed Message-oriented middleware (1) : Getting started with Rabbitmq in high availability combat! All have been said, here is not repeated, interested friends can go to see.

This is the second article in this series, kafka,

Kakfa is widely used by BAT, Bytedance, Meituan, Netflix, Airbnb, Twitter, etc. Its importance is self-evident.

Kafka has two advantages over the other three major middleware components:

  • High performance, millions per second level;
  • Distributed, highly available, and horizontally scalable.

Today we’ll take a closer look at how Kafka works. Due to space constraints, certainly will not be completely written, can only pick a few more important points to analyze with you, the interview questions will not be analyzed in this article, separately organized a KAFka study notes PDF and set the classic high-frequency interview questions analysis, need friends can get their own

  • Full edition kafka study Notes
  • Kafka high-frequency interview questions

All right, no more words, just sit tight and let’s go!

Kafka cluster construction and use

Kafka’s figure

There is a Chinese official website, you can have a detailed look.

Address: kafka.apachecn.org/intro.html

1. Software download

1.1 kakfa download

Address: kafka.apache.org/downloads

1.2 they download

(1) Kafka relies on ZooKeeper for scheduling. Kafka is actually built with Kafka, but it is generally recommended to use an independent ZooKeeper to facilitate subsequent upgrades and public use.

(2) Download address:

zookeeper.apache.org/

1.3 Download Description

The files are small. Zk is over 9m, kafka is over 50 megabytes

2. Kafka standalone deployment and cluster deployment

** Beiyou has three virtual machines with IP addresses as follows:

192.168.85.158
192.168.85.168
192.168.85.178

Copy the code

2.1 Single-node Deployment

(1) Upload the JAR package to the /root/tools directory. You do not need to create a new user. Run the jar package under the root account.

(2) Decompress

[root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1. TGZ [root@ruanjianlaowang158 Tools]# tar -zxvf Apache - they are - 3.5.7 - bin. Tar. GzCopy the code

(3) Configure zooKeeper and start it

[root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# CD /root/tools/apache-zookeeper-3.5.7-bin Configure [root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data [root@ruanjianlaowang158 conf]# CD in the following configuration file /root/tools/apache-zookeeper-3.5.7-bin/conf [root@ruanjianlaowang158 conf]# cp zoo_sample.cfg zoo.cfg [root@ruanjianlaowang158 conf]# vi zoo. CFG DataDir =/ TMP /zookeeper dataDir=/root/tools/apache-zookeeper-3.5.7-bin/data # start zookeeper [root@ruanjianlaowang158 bin]# CD /root/tools/apache-zookeeper-3.5.7-bin/bin [root@ruanjianlaowang158 bin]#./ zkserver. sh startCopy the code

(4) Configure kafka and start it

[root@ruanjianlaowang158 kafka_2.12-2.4.1]# CD /root/tools/kafka_2.12-2.4.1 Create an empty folder [root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data Change the configuration file [root@ruanjianlaowang158 config]# CD /root/tools/kafka_2.12-2.4.1/config [root@ruanjianlaowang158 config]# vi Server.properties #listeners =/ TMP /kafka-logs log.dirs=/root/tools/kafka_2.12-2.4.1/data # process = PLAINTEXT: / / your. Host. Name: 9092 listeners = PLAINTEXT: / / 192.168.85.158:9092 # zookeeper. Connect = localhost: 2181 Zookeeper. connect=192.168.85.158:2181 # kafka [root@ruanjianlaowang158 bin]# CD /root/tools/kafka_2.12-2.4.1/bin [root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh .. /config/server.properties &Copy the code

After the startup, the single-machine verification is not verified, and the verification is carried out directly in the cluster.

2.2 Cluster Deployment

(1) Cluster mode: Decompress the preceding single-machine mode and configure it on servers 192.168.85.168 and 192.168.85.178.

(2) Zookeeper is changed to zoo.cfg

The three servers are the same:

[root@ruanjianlaowang158 conf]# CD /root/tools/apache-zookeeper-3.5.7-bin/conf [root@ruanjianlaowang158 conf]# vi Zoo. CFG # other unchanged, add the last three lines, three server configuration is the same, 1=192.168.85.158:2888:3888 server.2=192.168.85.168:2888:3888 server.3=192.168.85.178:2888:3888 158 Server run: Echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/ myID 168 Run the following command on the server: Echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid 178 Echo "3" > / root/tools/apache - they are - 3.5.7 - bin/data/myidCopy the code

(3) Kafka cluster configuration

[root@ruanjianlaowang158 config]# CD /root/tools/kafka_2.12-2.4.1/config [root@ruanjianlaowang158 config]# vi Server.properties #broker.id 158 Server set to 1,168 Server set to 2,178 Server set to 3 broker.id=1 # All three servers are configured the same Zookeeper. Connect = 192.168.85.158:2181192168 85.168:2181192168 85.178:2181Copy the code

Kafka Broker Broker

Configuration items Default value/Example value instructions
broker.id 0 Broker unique identification
listeners PLAINTEXT: / / 192.168.85.158:9092 To listen for information, PLAINTEXT means PLAINTEXT transmission
log.dirs / root/tools/apache – they are – 3.5.7 – bin/data Kafka data storage address. You can enter multiple addresses. With “, “interval
message.max.bytes message.max.bytes Limit on the length of a single message, in bytes
num.partitions 1 Default number of partitions
log.flush.interval.messages Long.MaxValue The maximum number of messages accumulated before data is written to the hard disk and available to consumers
log.flush.interval.ms Long.MaxValue The maximum time before data is written to the hard disk
log.flush.scheduler.interval.ms Long.MaxValue The interval at which data is to be written to the hard disk.
log.retention.hours 24 Controls the retention time of a log, in hours
zookeeper.connect 192.168.85.158:2181,
192.168.85.168:2181,
192.168.85.178:2181 IP address of the ZooKeeper server. Multiple servers are separated by commas (,)

(4) The cluster starts

The startup mode is the same as that of a single machine:

[root@ruanjianlaowang158 bin]# CD /root/tools/apache-zookeeper-3.5.7-bin/bin [root@ruanjianlaowang158 bin]# Sh start # kafka [root@ruanjianlaowang158 bin]# CD /root/tools/kafka_2.12-2.4.1/bin [root@ruanjianlaowang158 bin]# ./zookeeper-server-start.sh .. /config/server.properties &Copy the code

(5) Pay attention

Kafka: Configured broker. Id 2 doesn't match stored broker. Id 0 in meta.properties. Solution: There is a file in server 158 data: meta.properties. The broker.id in the file also needs to be changed to be the same as the broker.id in server.Copy the code

(6) Create a topic for later springboot project test.

[root@ruanjianlaowang158 bin]# CD /root/tools/kafka_2.12-2.4.1/bin [root@ruanjianlaowang158 bin]#./kafka-topics - create - zookeeper 192.168.85.158:2181192168 85.168:2181192168 85.178:2181 - replication - factor 3 - partitions --topic aaaaCopy the code

3. Combine with springboot project

3.1 pom file

<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion > 4.0.0 < / modelVersion > < the parent > < groupId > org. Springframework. Boot < / groupId > The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.2.0. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <groupId>com.itany</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build>  <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code

Description:

There are mainly two GAVs, one is spring-boot-starter-Web, which starts the use of Web services; One is spring-Kafka, which is the SpringBoot integrated kafka core package.

3.2 application. Yml

Bootstrap-servers: bootstrap-servers: 192.168.85.158:9092192168 85.168:9092192168:85.178 9092 producer: key - serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializerCopy the code

3.3 Being a producer

@RestController public class KafkaProducer { @Autowired private KafkaTemplate template; / / northward; Aaaa @requestMapping ("/sendMsg") public String sendMsg(String topic, String message){ template.send(topic,message); return "success"; }}Copy the code

3.4 Consumer

@component public class KafkaConsumer {// @kafkalistener (topics = {"aaaa"}) public void listen(ConsumerRecord record){ System.out.println(record.topic()+":"+record.value()); }}Copy the code

3.5 Verifying Results

(1) Enter in the browser

http://localhost:8080/sendMsg?topic=aaaa&message=bbbb

Copy the code

(2) Printed information on the IDEA console of North Travel

  • Complete version kafka learning notes to collect

Kafka replicas

1. What is the duplicate mechanism?

Usually refers to a distributed system that keeps identical copies of data on multiple interconnected machines

2. Benefits of duplicate mechanism:

2.1 Data redundancy

The ability of a system to continue to operate when part of its components fail increases overall availability and data persistence

2.2 Provides high scalability

Supports horizontal scaling to improve read performance and throughput by adding more machines

2.3 Improve data locality

Reduces system latency by allowing data to be placed near the user’s geographical location.

3. Kafka copy

(1) is essentially a log file that can only append messages

(2) All copies under the same partition hold the same message sequence

(3) Replicas are stored on different brokers, so that they are not available when some brokers go down. (Kafka has several themes, and each theme can be further divided into several partitions. Several copies per partition configuration)

Here is the replica distribution on a Kafka cluster with three brokers

How does Kafka ensure that all copies of a partition have the same message sequence?

Leader-based replica mechanism

Working principle is shown as follows:

(1) Kafka is divided into two types of replicas: Leader Replica and Follower Replica. Each partition elects a copy when it is created, called a leader copy, and the remaining copies are automatically called follower copies.

(2) In Kafka, follower replicas are not serviced. The follower replica does not process client requests, and its only task is from the leader replica. All read and write requests must be sent to the Broker of the leader replica, which handles them. (So for now Kafka can only enjoy the first benefit of replication, which is providing data redundancy for high availability and persistence.)

(3) Kafka relies on the monitoring function provided by ZooKeeper to detect the failure of the leader’s Broker in real time, and immediately starts a new round of leader election. One of the followers is selected as the new leader. The old Leader replica can only be added to the cluster as a follower replica after being restarted.

Under what conditions are kafka follower replicas synchronized with the Leader

Kafka introduces in-Sync Replicas, which are called ISR Replicas. Replicas in an ISR are all replicas that are synchronized with the Leader, whereas follower replicas that are not in the ISR are considered out of sync with the Leader

6. Kafka In-Sync Replicas (ISR)

(1) ISR is not only a set of follower replicas, it must include Leader replicas. Even in some cases, the ISR has only one copy, the Leader

(2) Which Follower replica synchronizes with the Leader is controlled by the replica.lag.time.max.ms parameter (the maximum interval at which the Follower replica can lag behind the Leader replica) at the Broker end? Kafka considers a Follower copy to be synchronized with the Leader as long as the Follower copy does not lag behind the Leader by more than 10 consecutive seconds. Even though at this point the Follower copy holds significantly fewer messages than the Leader copy.

(3) ISR is a dynamically adjusted set rather than a statically invariant one.

The Follower copy will be considered out of sync with the Leader copy after the replica.lag.time.max.ms time when the Follower copy continues to pull data from the Leader copy slower than the message write speed of the Leader copy. Therefore, it cannot be added to ISR. At this point Kafka automatically shrinks the ISR collection, “kicking” the copy out of the ISR.

If the replica slowly catches up with the Leader, it can be added back to the ISR.

(4) If the ISR set is empty, the leader copy also hangs, and the partition becomes unavailable. Producer cannot send any messages to the partition. (If the leader replica fails, you can elect the leader replica from the ISR collection.)

Kafka leader broker is suspended. How to elect the leader

(1) ISR is not empty and is elected from ISR

(2), the ISR is empty, Kafka can never copy of survival in the ISR in elections, this process is called Unclean leader election, Unclean by the Broker end parameters. The leader. The election. The enable control whether to allow Unclean leader election.

Enabling Unclean Leader election may result in data loss, but the upside is that it keeps the Leader copy of the partition in existence and prevents it from stopping external services, thus improving high availability. On the other hand, the upside of banning Unclean leader election is that it maintains data consistency and avoids message loss at the expense of high availability.

A distributed system usually can only satisfy two aspects of Consistency, Availability and Partition tolerance at the same time. Clearly, Kafka gives you the choice between C and A in this matter.

It is strongly recommended not to enable unclean Leader election. After all, there are other ways to improve high availability. It’s not worth sacrificing data consistency for this high availability improvement.

Ps1: The election of the leader replica can also be understood as the election of the partition leader

Ps2: The election of the broker leader is different from the election of the partition leader.

Kafka’s Leader election is implemented by creating a temporary/Controller node on ZooKeeper to which information about the current broker is written

{" version ": 1," brokerid ": 1," timestamp ":" 1512018424988 "}Copy the code

Using the strong consistency feature of Zookeeper, a node can be successfully created by only one client. The successfully created broker is the leader, which is the controller of the cluster, responsible for all transactions of different sizes in the cluster.

When the connection between the Leader and ZooKeeper is lost, the temporary node will be deleted, and other brokers will monitor the change of the node. When the node is deleted, other brokers will receive an event notification and re-initiate the leader election

One more quick question for you: if the Follower copy is allowed to provide read services, how do you avoid or mitigate data inconsistency caused by the inconsistency between the Follower copy and the Leader copy?

3. Real-time log statistics process

1. Project process

In the process of integrating this scheme, the project team also had a discussion. In the discussion, there were many opinions. Some people thought that the real-time processing should be directly carried out by Storm without Kafka link. There were also suggestions that Kafka API should be used for consumption, and Storm consumption should be removed. However, the group unanimously decided to use this solution for the following reasons:

  • Business modularization
  • Functional componentization

In our opinion, Kafka should play a single role in the whole process. The whole process of this project is a piece of middleware. The following figure illustrates this reason:

The whole project process is shown in the figure above. This division makes each business modular and functions clearer.

  • Data Collection

Responsible for real-time collection of log data reported by users from each node, we choose Apache Flume NG to achieve.

  • Data Access

Since the speed of data collection is not necessarily the same as the speed of data processing, a middleware is added here to do the processing, using Apache Kafka, about Kafka cluster deployment. In addition, some data flows to the HDFS, which provides data sources for offline statistics services.

  • Stream Computing

After collecting data, we need to process the data in real time, and We choose Apache Storm. – Storm’s cluster deployment blog will be added later, which is relatively simple.

  • Data Output

After using Storm to process the data, we need to persist the results after processing. Because of high requirements on response speed, Redis+MySQL is used for persistence here. The process architecture diagram of the whole project is as follows:

2, the Flume

Flume is a distributed and highly available system for collecting, aggregating, and transferring massive logs. It supports customized data sending parties (such as Kafka and HDFS) in the log system to facilitate data collection. Flume provides various log source collection types, including Console, RPC, Text, Tail, Syslog, and Exec data sources. In our log system, spooldir is used to collect log files. The configuration information is as follows:

producer.sources.s.type = spooldir
producer.sources.s.spoolDir = /home/hadoop/dir/logdfs
Copy the code

Of course, Flume data sender type is also a variety of types, including Console, Text, HDFS, RPC, etc. Here our system uses Kafka middleware to receive, configuration content is as follows:

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
Copy the code

3, Kafka

Kafka is a distributed publish/subscribe messaging system that provides high throughput. Its features are as follows:

  • Message persistence is provided through disk data structures that allow storage to remain stable for long periods of time, even when the data reaches the TB+ level.
  • The throughput feature allows Kafka to support 10W messages per second, even with ordinary machine hardware.
  • Can Partition messages via Kafka Cluster and Consumer Cluster.

Kafka’s goal is to provide a publish and subscribe solution that handles all the data flowing through Consumer websites, web browsing, searching, and user behavior, which are key factors. This data is usually addressed by processing logs and log aggregation due to throughput requirements. For such log data and offline computing systems as Hadoop, such a scheme is a better solution for real-time processing.

Kafka cluster setup deployment and use, the above has been written, not friends turn up to look again, here is no repeat.

4, Storm

Twitter has open-source Storm, a distributed, fault-tolerant real-time computing system, which has been contributed to the Apache Foundation to download:

http://storm.apache.org/downloads.html
Copy the code

Storm’s main features are as follows:

  • Simple programming model. Just as MapReduce reduces the complexity of parallel batch processing, Storm reduces the complexity of real-time processing.
  • You can use a variety of programming languages. You can use various programming languages on Top of Storm. Clojure, Java, Ruby, and Python are supported by default. To add support for other languages, simply implement a simple Storm communication protocol.
  • Fault tolerance. – Storm will manage worker process and node failures.
  • Scale horizontally. Computations are performed in parallel across multiple threads, processes, and servers.
  • Reliable message handling. – Storm guarantees that every message will be fully processed at least once. When the task fails, it is responsible for retrying the message from the message source.
  • Fast. The system is designed to ensure that messages can be processed quickly, using ø MQ as its underlying message queue.
  • Local mode. Storm has a native mode that fully simulates Storm clusters during processing. This allows you to develop and unit test quickly.

The Storm cluster consists of a primary node and multiple working nodes. The master node runs a daemon called Nimbus for code assignment, task assignment, and fault detection. Each work node runs a daemon called Supervisor that listens for work, starts and terminates the work process.

Nimbus and Supervisor both fail quickly and are stateless, which makes them robust. Coordination between the two is done by Apache’s ZooKeeper.

Storm’s terms includeStream,Spout,Bolt,Task,Worker,Stream GroupingandTopology.

  • Stream is the data being processed.
  • Spout is the data source.
  • Bolt processes data.
  • A Task is a thread that runs in Spout or Bolt.
  • The Worker is the process that runs these threads.
  • Stream Grouping specifies what Bolt receives as input data. Data can be assigned randomly (Shuffle), by field value (Fields), broadcast (All), always sent to a Task (Global), or ignored (None), Or by custom logic (Direct in the jargon).
  • Topology is a network of Spout and Bolt nodes connected by Stream Grouping. These terms are described in more detail on the Storm Concepts page.

As for Storm cluster setup and deployment, the blog will update in the next post, and the update address will be attached here, so I won’t talk too much about Storm cluster setup and deployment.

5, summary

One big difference between Kafka as a high-throughput message-oriented middleware and traditional message-oriented middleware is that its logs are actually stored in the /kafka-logs folder by default. Although there is a 7-day clear mechanism by default, data cannot be written when the disk capacity is insufficient and the data volume is large. How to adjust some of Kafka’s default parameters is critical. Here are some common configuration parameters for your reference:

Segment policy attribute

The property name meaning The default value
log.roll.{hours,ms} Period of log scrolling. When the specified period is reached, a new segment is forced to be generated 168 (7 day)
log.segment.bytes The maximum capacity of each segment. When the specified segment is reached, a new segment is forced to be generated 1G(-1 means unlimited)
log.retention.check.interval.ms Interval for checking log fragment files 60000

Log Refresh Policy

Kafka’s logs actually start out in the cache and are then written to log files periodically in batches based on policy to improve throughput.

The property name meaning The default value
log.flush.interval.messages Write data to the log file at the number of messages 10000
log.flush.interval.ms When this time is reached, a flush is forced null
log.flush.scheduler.interval.ms Periodically check whether information needs to be flushed A lot of value

Log saving and clearing policies

The property name meaning The default value
log.cleanup.polict There are only delete and Compact policies for saving logs delete
log.retention.hours The value can be hours,minutes, or ms 168(7day)
log.retention.bytes Maximum number of log files that can be saved before deletion – 1
log.segment.delete.delay.ms The retention time of log files before they are actually deleted 60000
log.cleanup.interval.mins How often to call the cleanup step at regular intervals 10
log.retention.check.interval.ms Periodically check whether any logs meet the conditions for deletion (used in new versions) ) 300000

Here in particular, the log really clear time. When the deletion criteria are met, the log will be “deleted”, but the deletion actually marks the log as “delete”, the file simply cannot be indexed.

File itself, however, is still exist, only when the log. The segment. Delete. Delay. Ms after this time, the file will be deleted from the file system.


Article writes about here, than I expected to write a short one, because there are a few things to write unavoidably tirade, space does not allow, think more thoroughly grasp the kafka classmates can I get the full version of the kafka study notes, prepare for the interview recently classmates I can look at the kafka high-frequency interview questions.

Later I will also write the other two middleware article analysis, you can give me a point of attention to receive the first notice

And, uh, can I get a thumbs up, guys?!

end