The premise

Assume you know SpringBoot and Kafka. 1, SpringBoot

If you’re not familiar with SpringBoot, check out the DD guy and pure Smile series of blogs.

2, Kafka

Kafka can look at the blog I wrote two days ago: Kafka installation and quick start learning their own virtual machine to build their own manual environment, conditional to buy a server.

Note: Be sure to install the practice yourself, and we’ll integrate the two.

Create a project

Overall project structure:


Using IDEA to create a SpringBoot project, this is very simple, not too much explained here.

1. Pom file code is as follows:

<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > The < modelVersion > 4.0.0 < / modelVersion > < groupId > com. Zhisheng < / groupId > < artifactId > kafka - learning < / artifactId > <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name> Kafka-learning </name> <description>Demo project for Spring Boot + kafka</description> <parent> <groupId>org.springframework.boot</groupId> The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 1.5.9. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> < project. Reporting. OutputEncoding > utf-8 < / project. Reporting. OutputEncoding > < Java version > 1.8 < / Java version > </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> < version > 1.1.1. RELEASE < / version > < / dependency > < the dependency > < groupId > com. Google. Code. Gson < / groupId > <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code

Spring-kafka, Lombok, and Gson dependencies are introduced.

The Message entity class message.java is as follows:

@Data public class Message { private Long id; //id private String msg; // Message private Date sendTime; // timestamp}Copy the code

3. Message sending class kafkasender.java

@Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); Public void send() {Message Message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); }}Copy the code

At this point, the sending message code is implemented.

The key code here is the kafkatemplate.send () method. Zhisheng was a topic in Kafka. This topic did not need to be set up in Kafka in Java. ToJson (message) is the message content, here temporarily said so much, not in detail, after the opportunity to continue to put the source code interpretation of the blog out (because of the pit encountered in the middle, I followed several times the source code).

kafkaReceiver.java

@Component
@Slf4j
public class KafkaReceiver {
    @KafkaListener(topics = {"zhisheng"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }
    }
}
Copy the code

The consumer is very easy to receive messages with @kafkalistener annotations and set topics to listen to. Topics are an array of topics that can be bound to multiple topics. @kafkalistener (topics = {“zhisheng”,”tian”}) can listen to two topics at the same time. Note that this topic needs to be the same as the topic set up in the message sending class kafkasender.java.

5. Start the kafkaApplication.java class

@SpringBootApplication public class KafkaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; I++) {// call the message sending method sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code

6. Configure the file application.properties

# = = = = = = = = = = = = = = kafka = = = = = = = = = = = = = = = = = = = # specified kafka agent address, Can multiple spring. Kafka. The bootstrap - the servers = 192.168.153.135:9092 # = = = = = = = = = = = = = = = the provider = = = = = = = = = = = = = = = = = = = = = = = Spring. Kafka. Producer. Retries = 0 # the number of each batch send messages spring. Kafka. Producer. The batch - size = 16384 Spring. Kafka. Producer. The buffer - memory = 33554432 # specified message key and decoding way of the message body spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer = = = = = = = = = = = = = = = = = = = = = = = # specify a default consumer group id spring. Kafka. Consumer. The group id = test - consumer - group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true Spring. Kafka. Consumer. Auto - commit - interval = 100 # specified message key and decoding way of the message body spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerCopy the code

Bootstrap-servers set the IP address and port number 9092 of the kafka machine you installed.

If you simply integrate, the other defaults will be fine.

Kafka set

In your Kafka directory file:

Start the ZK

Start the single-node Zookeeper instance using the script in the installation package:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Copy the code

Start the KAFKA service

Start the kafka service with kafka-server-start.sh:

bin/kafka-server-start.sh  config/server.properties
Copy the code

After successful startup!

Be sure to turn off your firewall or Kafka port 9092 on your virtual machine or server.

run


This means integration is successful!

Let’s look at the list of topics in Kafka

bin/kafka-topics.sh --list --zookeeper localhost:2181
Copy the code

We found that zhisheng had created it ourselves.

Author: zhisheng


Links:
SpringBoot Kafka integrated use


Disclaimer: This article is from Jiile Technology contract blogger: Zhisheng, copyright belongs to the author, please indicate the author and source, thank you!