Start springboot project integration with Kafka
1. Project dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <! Yaml </groupId> <artifactId> snakeyAML </artifactId> </dependency> <! --kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>Copy the code
2. Configure the YML file
Bootstrap-servers Specifies the listening addresses for Kafka. For clustering, configure multiple addresses separated by commas
spring:
kafka:
bootstrap-servers: 192.1681.108.:9092.192.1681.109.:9092.192.1681.110.:9092# specify the IP address of kafka server. Producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: Default_consumer_group # group ID #enable-auto-commit:true
#auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:
port: 8500
Copy the code
3. Create a test interface to create a producer message
Kafka0 is the topic of Kafka and should be the same as topics in the consumer demo file
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/ * * *@author lizhe
* @date by 2021/02/13
*/
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@RequestMapping("message/send")
public String send(String msg){
kafkaTemplate.send("kafka0", msg); // Use the kafka template to send messages
return "success"; }}Copy the code
4. Create a consumer acceptance message demo
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/ * * *@authorLizhe * listens for relevant messages from Kafka on the server *@date by 2021/02/13
*/
@Component
public class ConsumerDemo {
/** * define the consumer to receive the message that Topics = "demo" in the controller@paramThe record variable represents the message itself, which can be passed through the ConsumerRecord<? ,? Type record variable to print various information about the received message */
@KafkaListener(topics = "kafka0")
public void listen (ConsumerRecord
record){
System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value()); }}Copy the code
5. Invoke the API interface to send messages
Params plus the MSG parameter
6. View console messages
‘HELLP’ is the message received
topic is kafka0, offset is 3, value is hellp
Copy the code
7. Access kafka of Linux to receive messages
[root@master ~]# kafka-console-consumer.sh --bootstrap-server 192.168.1.108:9092 --topic kafka0 --from-beginning
Copy the code
Call the interface again and receive the message HELLP
Reference:
Blog.csdn.net/yuanlong122…
www.jianshu.com/p/5da86afed…