preface
Because Kafka relies heavily on ZooKeeper, you need to set up a ZooKeeper cluster first. Since ZooKeeper is written in Java and needs to run on a JVM, a Java environment is required first. # yum install wget # yum install wget # yum install wget # Put a little bit east, a little bit west, and after a while you don’t know where you put it. Ps4: Kafka may have zooKeeper built in, but it feels like it can skip zooKeeper, too. I haven’t tried)
This article is published by the Java Architects Association, which updates technical articles daily
Configure the JDK
Oracle does not allow you to download JDK packages directly from its official website through wget. So what you download directly from wget is only a 5K web file, not the required JDK package. (Monopoly is capricious). (Please check whether the JDK comes with java-version, mine does not.)
1. Download from the official website
Here is the official download address of JDK8:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html
Copy the code
2. Upload and decompress
The XFTP file is uploaded to the specified location on the server: /usr/local
Decompress the compressed file:
tar -zxvf jdk-8u221-linux-x64.tar.gz
Copy the code
Rename the unzipped folder:
The mv jdk1.8.0 _221 jdk1.8Copy the code
3. Configure environment variables
Vim /etc/profile # Java environment export JAVA_HOME=/usr/local/jdk1.8 export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar export PATH=$PATH:${JAVA_HOME}/binCopy the code
The interface after the operation is as follows:
Run the command to make the environment take effect
source /etc/profile
Copy the code
2. Set up the ZooKeeper cluster
1. Download ZooKeeper
Create a ZooKeeper directory and download it from this directory:
mkdir /usr/local/zookeeper
Copy the code
In this step, if the connection is rejected, I can try several times. It was my second request that succeeded.
Wget HTTP: / / http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gzCopy the code
Wait for the download to complete and unzip:
The tar - ZXVF zookeeper - 3.4.6. Tar. GzCopy the code
Rename it to Zookeeper1
Mv Zookeeper-3.4.6 Zookeeper1 cp -r Zookeeper1 zooKeeper2 cp -r zookeeper1 Zookeeper3Copy the code
Create data/logs folder
Create it in the zooKeeper1 directory
Create a new myID file in the data directory. Content is 1
3. Modify the zoo. CFG file
cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg
Copy the code
After the above two steps, there is a zoo. CFG file, now modify the content to:
dataDir=/usr/local/zookeeper/zookeeper1/data dataLogDir=/usr/local/zookeeper/zookeeper1/logs Server. 1 = 192.168.233.11:2888-3888 for server 2 = 192.168.233.11:2889:3889 server. 3 = 192.168.233.11:2890-3890Copy the code
4. Build ZooKeeper2
First, copy the name.
cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2
Copy the code
Then modify specific configurations:
vim zookeeper2/conf/zoo.cfg
Copy the code
Change 1 to 2 in the following three places
vim zookeeper2/data/myid
Copy the code
Also change the value in myid to 2
5. Build Zookeeper3
As above, copy and rename
cp -r zookeeper1 zookeeper3
Copy the code
vim zookeeper3/conf/zoo.cfg
Copy the code
Changed to 3
vim zookeeper3/data/myid
Copy the code
Changed to 3
6. Test the ZooKeeper cluster
cd /usr/local/zookeeper/zookeeper1/bin/
Copy the code
Because the startup required a lot of code, here is a simple startup script:
vim start
Copy the code
The contents of start are as follows
cd /usr/local/zookeeper/zookeeper1/bin/ ./zkServer.sh start .. /conf/zoo.cfg cd /usr/local/zookeeper/zookeeper2/bin/ ./zkServer.sh start .. /conf/zoo.cfg cd /usr/local/zookeeper/zookeeper3/bin/ ./zkServer.sh start .. /conf/zoo.cfgCopy the code
Here is the connection script:
vim login
Copy the code
Login:
./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183
Copy the code
Once the script is written, start:
sh start
sh login
Copy the code
The following figure shows that the cluster is started successfully:
Zookeeper is now closed. Since ZooKeeper occupies the input window, you can right-click the xshell TAB and create an SSH channel. Then kafka can continue in a new window!
Build a Kafka cluster
1. Download Kafka
Create kafka directory first:
mkdir /usr/local/kafka
Copy the code
Then download it in that directory
CD/usr/local/kafka/wget HTTP: / / https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgzCopy the code
After successful download, unzip:
The tar - ZXVF kafka_2. 11-1.1.0. TGZCopy the code
2. Modify cluster configurations
First go to conf directory:
CD/usr/local/kafka/kafka_2. 11-1.1.0 / configCopy the code
Server.properties modified:
Broker. Id = 0 the dirs = / TMP/kafka - logs listeners = PLAINTEXT: / / 192.168.233.11:9092Copy the code
Make two copies of server.properties
cp server.properties server2.properties
cp server.properties server3.properties
Copy the code
Modify server2. The properties
vim server2.properties
Copy the code
The main contents are as follows:
Broker. Id = 1 the dirs = / TMP/kafka - logs1 listeners = PLAINTEXT: / / 192.168.233.11:9093Copy the code
As shown above, modify server3.properties to:
Broker. Id = 2 the dirs = / TMP/kafka - logs2 listeners = PLAINTEXT: / / 192.168.233.11:9094Copy the code
3. Start Kafka
Again, write a script in the bin directory:
cd .. /bin/ vim startCopy the code
The script content is as follows:
./kafka-server-start.sh .. /config/server.properties & ./kafka-server-start.sh .. /config/server2.properties & ./kafka-server-start.sh .. /config/server3.properties &Copy the code
Using the JPS command, you can view that three KafKas are started.
Create a Topic
CD /usr/local/kafka_2.11-1.1.0 bin/kafka-topics. Sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topicCopy the code
Kafka prints several logs
You can query this topic by command in the zooKeeper startup.
ls /brokers/topics
Copy the code
Check the Kafka status
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Copy the code
You can see that there are three nodes here: 1, 2, 0
The Leader is 1, and since there is only one partition, it is 0. Replicas: the primary and secondary backup is 1,2,0, and ISR (in-sync) : the surviving information is also 1,2,0
5. Start producers
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
Copy the code
Because can not press delete, can not press the left and right keys to adjust, so the statement some confusion ah. Em…
6. Start consumers
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
Copy the code
As you can see, once you start the consumer, you automatically start spending.
Another one was made at the producer.
Consumer automatic capture success!
4. Integrate SpringBoot
Kafka compatibility directory:
If not, springBoot will throw an exception!! If kafka-clients is 1.1.0, spring-kafka is 2.2.2, and kafka-clients is 2.2.2, then kafka-clients is 1.1.0 and spring-kafka is 2.2.2.
Back to business, two hours of work, finally done, want to cry… The problem is basically a jar version mismatch. I will modify the above steps accordingly, and strive for you to follow this tutorial again!!
1. Pom files
<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion > 4.0.0 < / modelVersion > < the parent > < groupId > org. Springframework. Boot < / groupId > The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.1.1. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <groupId>com.gzky</groupId> <artifactId>study</artifactId> <version>0.0.1-SNAPSHOT</version> <name>study</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> < artifactId > spring - the boot - starter - redis < / artifactId > < version > 1.3.8. RELEASE < / version > < / dependency > < the dependency > <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> < the groupId > org. Springframework. Kafka < / groupId > < artifactId > spring - kafka < / artifactId > < version > 2.2.0. RELEASE < / version > </dependency> <! -- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code
The poM files focus on the following two versions.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> The < version > 2.1.1. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.kafka</groupId> < artifactId > spring - kafka < / artifactId > < version > 2.2.0. RELEASE < / version > < / dependency >Copy the code
2, application. Yml
Spring: redis: cluster: # Set the lifetime of the key. When the key expires, it will be deleted automatically. Expire-seconds: 120 # Set the execution time of the command. If this time is exceeded, an error will be reported. Command -timeout: 5000 # command-timeout: 5000 # command-timeout: 5000 nodes: 192.168.233.11:9001192168 233.11:9002192168 233.11:9003192168, 233.11, 9004192168, 233.11, 9005192168, 233.11, 9006 Kafka: # specified kafka agent address, can be multiple bootstrap - the servers: 192.168.233.11:9092192168 233.11:9093192168 233.11:9094 producer: Retries: 0 number of messages to be sent in batches Batch-size: 16384 Buffer-memory: 33554432 org.apache.kafka.common.serialization.StringSerializer value-serializer: Org.apache.kafka.com mon. Serialization. StringSerializer consumer: # specify a default group, a consumer group id - the id: test-group auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 # specified message key and the message body way of decoding key - serializer: org.apache.kafka.com mon. Serialization. StringSerializer value - serializer: org.apache.kafka.common.serialization.StringSerializer server: port: 8085 servlet: #context-path: /redis context-path: /kafkaCopy the code
If you want to learn how to configure Redis cluster, you can refer to: “Redis cluster redis-cluster construction and integration of Springboot”
3. Producers
package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * @author biws * @date 2019/12/17 **/ @Component Public class KfkaProducer {private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; Public void send(String STR) {logger.info(" String: "+ STR); kafkaTemplate.send("testTopic", str); }}Copy the code
4. Consumers
package com.gzky.study.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka consumer listener ** @author biws * @date 2019/12/17 **/ @Component public class KafkaConsumerListener {private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class); @KafkaListener(topics = "testTopic") public void onMessage(String str){ //insert(str); // Insert database code logger.info(" listen to: "+ STR); System.out.println(" listen to: "+ STR); }}Copy the code
5. External interfaces
package com.gzky.study.controller; import com.gzky.study.utils.KfkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * kafka external interface ** @author biws * @date 2019/12/17 **/ @restController public class KafkaController {@autowired KfkaProducer kfkaProducer; @param STR @return @requestMapping (value = "/sendKafkaWithTestTopic",method = requestmethod.get) @ResponseBody public boolean sendTopic(@RequestParam String str){ kfkaProducer.send(str); return true; }}Copy the code
6. The Postman Test
Kafka root = kafka root = kafka root = kafka root
It is recommended to restart the kafka command:
CD/usr/local/kafka/kafka_2. 11-1.1.0. / bin/kafka - server - stop. Sh.. /config/server.properties & ./kafka-server-stop.sh .. /config/server2.properties & ./kafka-server-stop.sh .. /config/server3.properties &Copy the code
Restart kafka after all kafka files have been killed.
./kafka-server-start.sh .. /config/server.properties & ./kafka-server-start.sh .. /config/server2.properties & ./kafka-server-start.sh .. /config/server3.properties &Copy the code
After Kafka starts successfully, start the consumer listening port:
CD /usr/local/kafka_2.11-1.1.0 bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopicCopy the code
Once I disorderly input test information all monitored over!
Start the SpringBoot service
Then produce the message with Postman:
Then enjoy the results, server side listening success.
Project listener also successful!