preface

Because Kafka relies heavily on ZooKeeper, you need to set up a ZooKeeper cluster first. Since ZooKeeper is written in Java and needs to run on a JVM, a Java environment is required first. # yum install wget # yum install wget # yum install wget # Put a little bit east, a little bit west, and after a while you don’t know where you put it. Ps4: Kafka may have zooKeeper built in, but it feels like it can skip zooKeeper, too. I haven’t tried)

This article is published by the Java Architects Association, which updates technical articles daily

Configure the JDK

Oracle does not allow you to download JDK packages directly from its official website through wget. So what you download directly from wget is only a 5K web file, not the required JDK package. (Monopoly is capricious). (Please check whether the JDK comes with java-version, mine does not.)

1. Download from the official website

Here is the official download address of JDK8:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html
Copy the code

2. Upload and decompress

The XFTP file is uploaded to the specified location on the server: /usr/local

Decompress the compressed file:

tar -zxvf jdk-8u221-linux-x64.tar.gz
Copy the code

Rename the unzipped folder:

The mv jdk1.8.0 _221 jdk1.8Copy the code

3. Configure environment variables

Vim /etc/profile # Java environment export JAVA_HOME=/usr/local/jdk1.8 export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar export PATH=$PATH:${JAVA_HOME}/binCopy the code

The interface after the operation is as follows:

Run the command to make the environment take effect

source /etc/profile
Copy the code

2. Set up the ZooKeeper cluster

1. Download ZooKeeper

Create a ZooKeeper directory and download it from this directory:

mkdir /usr/local/zookeeper
Copy the code

In this step, if the connection is rejected, I can try several times. It was my second request that succeeded.

Wget HTTP: / / http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gzCopy the code

Wait for the download to complete and unzip:

The tar - ZXVF zookeeper - 3.4.6. Tar. GzCopy the code

Rename it to Zookeeper1

Mv Zookeeper-3.4.6 Zookeeper1 cp -r Zookeeper1 zooKeeper2 cp -r zookeeper1 Zookeeper3Copy the code

Create data/logs folder

Create it in the zooKeeper1 directory

Create a new myID file in the data directory. Content is 1

3. Modify the zoo. CFG file

cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg
Copy the code

After the above two steps, there is a zoo. CFG file, now modify the content to:

dataDir=/usr/local/zookeeper/zookeeper1/data dataLogDir=/usr/local/zookeeper/zookeeper1/logs Server. 1 = 192.168.233.11:2888-3888 for server 2 = 192.168.233.11:2889:3889 server. 3 = 192.168.233.11:2890-3890Copy the code

4. Build ZooKeeper2

First, copy the name.

cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2
Copy the code

Then modify specific configurations:

vim zookeeper2/conf/zoo.cfg
Copy the code

Change 1 to 2 in the following three places

vim zookeeper2/data/myid
Copy the code

Also change the value in myid to 2

5. Build Zookeeper3

As above, copy and rename

cp -r zookeeper1 zookeeper3
Copy the code

vim zookeeper3/conf/zoo.cfg
Copy the code

Changed to 3

vim zookeeper3/data/myid
Copy the code

Changed to 3

6. Test the ZooKeeper cluster

cd /usr/local/zookeeper/zookeeper1/bin/
Copy the code

Because the startup required a lot of code, here is a simple startup script:

vim start
Copy the code

The contents of start are as follows

cd /usr/local/zookeeper/zookeeper1/bin/ ./zkServer.sh start .. /conf/zoo.cfg cd /usr/local/zookeeper/zookeeper2/bin/ ./zkServer.sh start .. /conf/zoo.cfg cd /usr/local/zookeeper/zookeeper3/bin/ ./zkServer.sh start .. /conf/zoo.cfgCopy the code

Here is the connection script:

vim login
Copy the code

Login:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183
Copy the code

Once the script is written, start:

sh start
sh login
Copy the code

The following figure shows that the cluster is started successfully:

Zookeeper is now closed. Since ZooKeeper occupies the input window, you can right-click the xshell TAB and create an SSH channel. Then kafka can continue in a new window!

Build a Kafka cluster

1. Download Kafka

Create kafka directory first:

mkdir /usr/local/kafka
Copy the code

Then download it in that directory

CD/usr/local/kafka/wget HTTP: / / https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgzCopy the code

After successful download, unzip:

The tar - ZXVF kafka_2. 11-1.1.0. TGZCopy the code

2. Modify cluster configurations

First go to conf directory:

CD/usr/local/kafka/kafka_2. 11-1.1.0 / configCopy the code

Server.properties modified:

Broker. Id = 0 the dirs = / TMP/kafka - logs listeners = PLAINTEXT: / / 192.168.233.11:9092Copy the code

Make two copies of server.properties

cp server.properties server2.properties
cp server.properties server3.properties
Copy the code

Modify server2. The properties

vim server2.properties
Copy the code

The main contents are as follows:

Broker. Id = 1 the dirs = / TMP/kafka - logs1 listeners = PLAINTEXT: / / 192.168.233.11:9093Copy the code

As shown above, modify server3.properties to:

Broker. Id = 2 the dirs = / TMP/kafka - logs2 listeners = PLAINTEXT: / / 192.168.233.11:9094Copy the code

3. Start Kafka

Again, write a script in the bin directory:

cd .. /bin/ vim startCopy the code

The script content is as follows:

./kafka-server-start.sh .. /config/server.properties & ./kafka-server-start.sh .. /config/server2.properties & ./kafka-server-start.sh .. /config/server3.properties &Copy the code

Using the JPS command, you can view that three KafKas are started.

Create a Topic

CD /usr/local/kafka_2.11-1.1.0 bin/kafka-topics. Sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topicCopy the code

Kafka prints several logs

You can query this topic by command in the zooKeeper startup.

ls /brokers/topics
Copy the code

Check the Kafka status

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Copy the code

You can see that there are three nodes here: 1, 2, 0

The Leader is 1, and since there is only one partition, it is 0. Replicas: the primary and secondary backup is 1,2,0, and ISR (in-sync) : the surviving information is also 1,2,0

5. Start producers

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
Copy the code

Because can not press delete, can not press the left and right keys to adjust, so the statement some confusion ah. Em…

6. Start consumers

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
Copy the code

As you can see, once you start the consumer, you automatically start spending.

Another one was made at the producer.

Consumer automatic capture success!

4. Integrate SpringBoot

Kafka compatibility directory:

If not, springBoot will throw an exception!! If kafka-clients is 1.1.0, spring-kafka is 2.2.2, and kafka-clients is 2.2.2, then kafka-clients is 1.1.0 and spring-kafka is 2.2.2.

Back to business, two hours of work, finally done, want to cry… The problem is basically a jar version mismatch. I will modify the above steps accordingly, and strive for you to follow this tutorial again!!

1. Pom files

<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion > 4.0.0 < / modelVersion > < the parent > < groupId > org. Springframework. Boot < / groupId > The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.1.1. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <groupId>com.gzky</groupId> <artifactId>study</artifactId> <version>0.0.1-SNAPSHOT</version> <name>study</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion>  </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> < artifactId > spring - the boot - starter - redis < / artifactId > < version > 1.3.8. RELEASE < / version > < / dependency > < the dependency > <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> < the groupId > org. Springframework. Kafka < / groupId > < artifactId > spring - kafka < / artifactId > < version > 2.2.0. RELEASE < / version > </dependency> <! -- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code

The poM files focus on the following two versions.

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> The < version > 2.1.1. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.kafka</groupId> < artifactId > spring - kafka < / artifactId > < version > 2.2.0. RELEASE < / version > < / dependency >Copy the code

2, application. Yml

Spring: redis: cluster: # Set the lifetime of the key. When the key expires, it will be deleted automatically. Expire-seconds: 120 # Set the execution time of the command. If this time is exceeded, an error will be reported. Command -timeout: 5000 # command-timeout: 5000 # command-timeout: 5000 nodes: 192.168.233.11:9001192168 233.11:9002192168 233.11:9003192168, 233.11, 9004192168, 233.11, 9005192168, 233.11, 9006 Kafka: # specified kafka agent address, can be multiple bootstrap - the servers: 192.168.233.11:9092192168 233.11:9093192168 233.11:9094 producer: Retries: 0 number of messages to be sent in batches Batch-size: 16384 Buffer-memory: 33554432 org.apache.kafka.common.serialization.StringSerializer value-serializer: Org.apache.kafka.com mon. Serialization. StringSerializer consumer: # specify a default group, a consumer group id - the id: test-group auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 # specified message key and the message body way of decoding key - serializer: org.apache.kafka.com mon. Serialization. StringSerializer value - serializer: org.apache.kafka.common.serialization.StringSerializer server: port: 8085 servlet: #context-path: /redis context-path: /kafkaCopy the code

If you want to learn how to configure Redis cluster, you can refer to: “Redis cluster redis-cluster construction and integration of Springboot”

3. Producers

package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * @author biws * @date 2019/12/17 **/ @Component Public class KfkaProducer {private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; Public void send(String STR) {logger.info(" String: "+ STR); kafkaTemplate.send("testTopic", str); }}Copy the code

4. Consumers

package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka consumer listener ** @author biws * @date 2019/12/17 **/ @Component public class KafkaConsumerListener {private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class); @KafkaListener(topics = "testTopic") public void onMessage(String str){ //insert(str); // Insert database code logger.info(" listen to: "+ STR); System.out.println(" listen to: "+ STR); }}Copy the code

5. External interfaces

package com.gzky.study.controller; import com.gzky.study.utils.KfkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * kafka external interface ** @author biws * @date 2019/12/17 **/ @restController public class KafkaController {@autowired KfkaProducer kfkaProducer; @param STR @return @requestMapping (value = "/sendKafkaWithTestTopic",method = requestmethod.get) @ResponseBody public boolean sendTopic(@RequestParam String str){ kfkaProducer.send(str); return true; }}Copy the code

6. The Postman Test

Kafka root = kafka root = kafka root = kafka root

It is recommended to restart the kafka command:

CD/usr/local/kafka/kafka_2. 11-1.1.0. / bin/kafka - server - stop. Sh.. /config/server.properties & ./kafka-server-stop.sh .. /config/server2.properties & ./kafka-server-stop.sh .. /config/server3.properties &Copy the code

Restart kafka after all kafka files have been killed.

./kafka-server-start.sh .. /config/server.properties & ./kafka-server-start.sh .. /config/server2.properties & ./kafka-server-start.sh .. /config/server3.properties &Copy the code

After Kafka starts successfully, start the consumer listening port:

CD /usr/local/kafka_2.11-1.1.0 bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopicCopy the code

Once I disorderly input test information all monitored over!

Start the SpringBoot service

Then produce the message with Postman:

Then enjoy the results, server side listening success.

Project listener also successful!