I am participating in the Mid-Autumn Festival Creative Submission contest, please see: Mid-Autumn Festival Creative Submission Contest for details.

Add a kafka dependency

Search kafka in mvnRepository.com/, we…

Kafka supports different versions of SpringBoot. Kafka supports different versions of SpringBoot. Kafka supports different versions of SpringBoot.

IO /projects/sp…

Just go to the Maven repository to find the corresponding version: mvnrepository.com/

My Springboot version is 2.2.5, so the version using Kafka looks like this:

<! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> < the groupId > org. Springframework. Kafka < / groupId > < artifactId > spring - kafka < / artifactId > < version > 2.3.5. RELEASE < / version > </dependency>Copy the code

There are a lot of problems if the version does not correspond. Usually kafka is a version higher than SpringBoot, such as 2.2.5 kafka is 2.3.5.

2. Add a configuration file

I’m using a common engineering here to simulate producers and consumers. My SpringBoot project uses a configuration file of yamL type, so add it as follows:

Spring: kafka: the bootstrap - the servers: 192.168.184.134:9092192168 184.135:9092192168 184.136:9092 producer: # Number of times the message was resent after an error occurred. Retries: 0 # key key - serializer serialization way: org.apache.kafka.com mon. Serialization. The way of serializing the value - serializer StringSerializer # value: Org.apache.kafka.com mon. Serialization. StringSerializer consumer: group - id: test # key key - deserializer deserialization way: Org.apache.kafka.com mon. Serialization. StringDeserializer # value way of deserialized value - deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer listener: # but when consumer to monitor the topic does not exist, is guaranteed to project start missing - switchable viewer - fatal: falseCopy the code

There are many more configurations, but we will integrate them first and then gradually add more configurations.

Three, code implementation

3.1 producers

package com.cloud.bssp.message.kafka.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** * Kafka producer ** @author weirx * @date 2021/02/03 14:22 **/ @Component public class KafkaProducer {@autoWired private  KafkaTemplate kafkaTemplate; /** * kafka message send * @param * @author weirx * @return void * @date: 2021/2/3 */ public void send(String topic){ kafkaTemplate.send(topic,"hello kafka"); }}Copy the code

3.2 consumers

package com.cloud.bssp.message.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** * Kafka consumer ** @author weirx * @date 2021/02/03 15:01 **/ @slf4j @Component public class KafkaConsumer { @KafkaListener(topics = {"test-kafka"}) public void consumer(ConsumerRecord<? ,? > record) { Optional<? > kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); }}}Copy the code

3.3 the test class

package com.cloud.bssp.message.kafka; import com.cloud.bssp.message.kafka.producer.KafkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @author weirx * @date 2021/02/03 15:03 **/ @restController @requestMapping ("test/kafka") public class  TestKafka { @Autowired private KafkaProducer producer; @RequestMapping("/send/{topic}") public void send(@PathVariable String topic){ producer.send(topic); }}Copy the code

Four, test,

Here, I pass topic as a parameter to facilitate our observation. We perform the following test interfaces:

http://localhost:8085/test/kafka/send/test-kafka
Copy the code

Looking at the console, the consumer output:

The 2021-02-03 17:12:44. 5080-654 the INFO [ntainer# 0-0 - C - 1] C.C.B.M.K afka. Consumer. KafkaConsumer: ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 0, offset = 0, CreateTime = 1612343564622, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, Value = hello kafka) 17:12:44 2021-02-03. 5080-654 the INFO [ntainer# 0-0 - C - 1] C.C.B.M.K afka. Consumer. KafkaConsumer: ------------------ message =hello kafkaCopy the code

Topic: test-kafka partition: 5 offset: 0

Let’s take a look at the results from Kafkatool and add a message to partition 5:

Why are there 10 partitions here? In the previous section, we set up a kafka cluster with 10 partitions by default. The following sections will explain the use of partitions in detail.

Offset is the offset of the number of data in the current partition, starting from 0.

Let’s send 10 more messages so that the message is sent to Partition5 again. The results are as follows:

2021-02-03 17:15:17.129  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 0, CreateTime = 1612343717117, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:15:17.129  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:20:45.456  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1612344045453, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:20:45.456  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:12.999  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 4, leaderEpoch = 0, offset = 0, CreateTime = 1612344072988, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:12.999  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:15.983  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 0, CreateTime = 1612344075980, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:15.983  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:16.799  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1612344076796, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:16.799  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:17.614  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1612344077612, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:17.614  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:18.487  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 8, leaderEpoch = 0, offset = 0, CreateTime = 1612344078484, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:18.487  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:19.527  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 7, leaderEpoch = 0, offset = 0, CreateTime = 1612344079524, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:19.527  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:20.505  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 6, leaderEpoch = 0, offset = 0, CreateTime = 1612344080500, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:20.505  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
2021-02-03 17:21:21.440  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 0, offset = 1, CreateTime = 1612344081437, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:21.440  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
Copy the code

As a result of the above, 10 more messages are sent, which are loaded randomly to each partition. The 10th item is recorded again in Partition 5, whose offset is increased by 1.


At this point, the integration of SpringBoot and Kafka is complete, and we’ll drill down into each feature and component as we go.