The premise
Assume you know SpringBoot and Kafka. 1, SpringBoot
If you’re not familiar with SpringBoot, check out the DD guy and pure Smile series of blogs.
2, Kafka
Kafka can look at the blog I wrote two days ago: Kafka installation and quick start learning their own virtual machine to build their own manual environment, conditional to buy a server.
Note: Be sure to install the practice yourself, and we’ll integrate the two.
Create a project
Overall project structure:
Using IDEA to create a SpringBoot project, this is very simple, not too much explained here.
1. Pom file code is as follows:
<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > The < modelVersion > 4.0.0 < / modelVersion > < groupId > com. Zhisheng < / groupId > < artifactId > kafka - learning < / artifactId > <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name> Kafka-learning </name> <description>Demo project for Spring Boot + kafka</description> <parent> <groupId>org.springframework.boot</groupId> The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 1.5.9. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> < project. Reporting. OutputEncoding > utf-8 < / project. Reporting. OutputEncoding > < Java version > 1.8 < / Java version > </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> < version > 1.1.1. RELEASE < / version > < / dependency > < the dependency > < groupId > com. Google. Code. Gson < / groupId > <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code
Spring-kafka, Lombok, and Gson dependencies are introduced.
The Message entity class message.java is as follows:
@Data public class Message { private Long id; //id private String msg; // Message private Date sendTime; // timestamp}Copy the code
3. Message sending class kafkasender.java
@Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); Public void send() {Message Message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("zhisheng", gson.toJson(message)); }}Copy the code
At this point, the sending message code is implemented.
The key code here is the kafkatemplate.send () method. Zhisheng was a topic in Kafka. This topic did not need to be set up in Kafka in Java. ToJson (message) is the message content, here temporarily said so much, not in detail, after the opportunity to continue to put the source code interpretation of the blog out (because of the pit encountered in the middle, I followed several times the source code).
kafkaReceiver.java
@Component
@Slf4j
public class KafkaReceiver {
@KafkaListener(topics = {"zhisheng"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}Copy the code
The consumer is very easy to receive messages with @kafkalistener annotations and set topics to listen to. Topics are an array of topics that can be bound to multiple topics. @kafkalistener (topics = {“zhisheng”,”tian”}) can listen to two topics at the same time. Note that this topic needs to be the same as the topic set up in the message sending class kafkasender.java.
5. Start the kafkaApplication.java class
@SpringBootApplication public class KafkaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; I++) {// call the message sending method sender.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }}}}Copy the code
6. Configure the file application.properties
# = = = = = = = = = = = = = = kafka = = = = = = = = = = = = = = = = = = = # specified kafka agent address, Can multiple spring. Kafka. The bootstrap - the servers = 192.168.153.135:9092 # = = = = = = = = = = = = = = = the provider = = = = = = = = = = = = = = = = = = = = = = = Spring. Kafka. Producer. Retries = 0 # the number of each batch send messages spring. Kafka. Producer. The batch - size = 16384 Spring. Kafka. Producer. The buffer - memory = 33554432 # specified message key and decoding way of the message body spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer = = = = = = = = = = = = = = = = = = = = = = = # specify a default consumer group id spring. Kafka. Consumer. The group id = test - consumer - group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true Spring. Kafka. Consumer. Auto - commit - interval = 100 # specified message key and decoding way of the message body spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerCopy the code
Bootstrap-servers set the IP address and port number 9092 of the kafka machine you installed.
If you simply integrate, the other defaults will be fine.
Kafka set
In your Kafka directory file:
Start the zk
Start the single-node Zookeeper instance using the script in the installation package:
bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesCopy the code
Start the Kafka service
Start the kafka service with kafka-server-start.sh:
bin/kafka-server-start.sh config/server.propertiesCopy the code
After successful startup!
Be sure to turn off your firewall or Kafka port 9092 on your virtual machine or server.
run
This means integration is successful!
Let’s look at the list of topics in Kafka
bin/kafka-topics.sh --list --zookeeper localhost:2181Copy the code
We found that zhisheng had created it ourselves.
Pay attention to my
The last
Reprint please indicate the original address is: www.54tianzhisheng.cn/2018/01/05/…