I am participating in the Mid-Autumn Festival Creative Submission contest, please see: Mid-Autumn Festival Creative Submission Contest for details.
Add a kafka dependency
Search kafka in mvnRepository.com/, we…
Kafka supports different versions of SpringBoot. Kafka supports different versions of SpringBoot. Kafka supports different versions of SpringBoot.
IO /projects/sp…
Just go to the Maven repository to find the corresponding version: mvnrepository.com/
My Springboot version is 2.2.5, so the version using Kafka looks like this:
<! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> < the groupId > org. Springframework. Kafka < / groupId > < artifactId > spring - kafka < / artifactId > < version > 2.3.5. RELEASE < / version > </dependency>Copy the code
There are a lot of problems if the version does not correspond. Usually kafka is a version higher than SpringBoot, such as 2.2.5 kafka is 2.3.5.
2. Add a configuration file
I’m using a common engineering here to simulate producers and consumers. My SpringBoot project uses a configuration file of yamL type, so add it as follows:
Spring: kafka: the bootstrap - the servers: 192.168.184.134:9092192168 184.135:9092192168 184.136:9092 producer: # Number of times the message was resent after an error occurred. Retries: 0 # key key - serializer serialization way: org.apache.kafka.com mon. Serialization. The way of serializing the value - serializer StringSerializer # value: Org.apache.kafka.com mon. Serialization. StringSerializer consumer: group - id: test # key key - deserializer deserialization way: Org.apache.kafka.com mon. Serialization. StringDeserializer # value way of deserialized value - deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer listener: # but when consumer to monitor the topic does not exist, is guaranteed to project start missing - switchable viewer - fatal: falseCopy the code
There are many more configurations, but we will integrate them first and then gradually add more configurations.
Three, code implementation
3.1 producers
package com.cloud.bssp.message.kafka.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * Kafka producer ** @author weirx * @date 2021/02/03 14:22 **/ @Component public class KafkaProducer {@autoWired private KafkaTemplate kafkaTemplate; /** * kafka message send * @param * @author weirx * @return void * @date: 2021/2/3 */ public void send(String topic){ kafkaTemplate.send(topic,"hello kafka"); }}Copy the code
3.2 consumers
package com.cloud.bssp.message.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * Kafka consumer ** @author weirx * @date 2021/02/03 15:01 **/ @slf4j @Component public class KafkaConsumer { @KafkaListener(topics = {"test-kafka"}) public void consumer(ConsumerRecord<? ,? > record) { Optional<? > kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); }}}Copy the code
3.3 the test class
package com.cloud.bssp.message.kafka; import com.cloud.bssp.message.kafka.producer.KafkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @author weirx * @date 2021/02/03 15:03 **/ @restController @requestMapping ("test/kafka") public class TestKafka { @Autowired private KafkaProducer producer; @RequestMapping("/send/{topic}") public void send(@PathVariable String topic){ producer.send(topic); }}Copy the code
Four, test,
Here, I pass topic as a parameter to facilitate our observation. We perform the following test interfaces:
http://localhost:8085/test/kafka/send/test-kafka
Copy the code
Looking at the console, the consumer output:
The 2021-02-03 17:12:44. 5080-654 the INFO [ntainer# 0-0 - C - 1] C.C.B.M.K afka. Consumer. KafkaConsumer: ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 0, offset = 0, CreateTime = 1612343564622, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, Value = hello kafka) 17:12:44 2021-02-03. 5080-654 the INFO [ntainer# 0-0 - C - 1] C.C.B.M.K afka. Consumer. KafkaConsumer: ------------------ message =hello kafkaCopy the code
Topic: test-kafka partition: 5 offset: 0
Let’s take a look at the results from Kafkatool and add a message to partition 5:
Why are there 10 partitions here? In the previous section, we set up a kafka cluster with 10 partitions by default. The following sections will explain the use of partitions in detail.
Offset is the offset of the number of data in the current partition, starting from 0.
Let’s send 10 more messages so that the message is sent to Partition5 again. The results are as follows:
2021-02-03 17:15:17.129 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 0, CreateTime = 1612343717117, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:15:17.129 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:20:45.456 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1612344045453, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:20:45.456 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:12.999 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 4, leaderEpoch = 0, offset = 0, CreateTime = 1612344072988, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:12.999 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:15.983 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 0, CreateTime = 1612344075980, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:15.983 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:16.799 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1612344076796, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:16.799 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:17.614 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1612344077612, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:17.614 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:18.487 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 8, leaderEpoch = 0, offset = 0, CreateTime = 1612344078484, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:18.487 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:19.527 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 7, leaderEpoch = 0, offset = 0, CreateTime = 1612344079524, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:19.527 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:20.505 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 6, leaderEpoch = 0, offset = 0, CreateTime = 1612344080500, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:20.505 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:21.440 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 0, offset = 1, CreateTime = 1612344081437, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:21.440 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
Copy the code
As a result of the above, 10 more messages are sent, which are loaded randomly to each partition. The 10th item is recorded again in Partition 5, whose offset is increased by 1.
At this point, the integration of SpringBoot and Kafka is complete, and we’ll drill down into each feature and component as we go.