References:

  • Use Kafka message queues in SpringBoot
  • Creating Topics gracefully

Ps: The reference materials are quite clear. Here are some supplements

Step1: add dependencies

<! -- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>


<! ConfigurationProperties ConfigurationProperties ConfigurationProperties ConfigurationProperties
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
Copy the code

Start the class and add the configuration

@EnableScheduling
@EnableConfigurationProperties
@SpringBootApplication
public class SpringbootHelloApplication {

    public static void main(String[] args) { SpringApplication.run(SpringbootHelloApplication.class, args); }}Copy the code

Step2: configure kafka

Configure the following information in the application.properties file

spring.application.name=springboot_hello
server.port=9001
# kafka configuration start
# server configuration
spring.kafka.bootstrap-servers=192.168.31.11:9093
# producer configuration
Convert the sent message to JSON format
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
The number of acknowledgements received by the leader before completing the request is required to control the persistence of the sent record on the server. The value can be as follows:
#acks = 0 If set to zero, the producer will not wait for any acknowledgement from the server, and the record will be immediately added to the socket buffer and treated as sent. In this case, there is no guarantee that the server has received the record, and the retry configuration will not take effect (because the client will not normally be aware of any failures), and the offset returned for each record is always set to -1.
#acks = 1 this means that the leader writes the record to its local log, but responds without waiting for full confirmation from all replica servers. In this case, if the leader fails immediately after the record is acknowledged, but before the data is copied to all replica servers, the record will be lost.
#acks = all this means that the leader will wait for the complete set of synchronous replicas to confirm the record, which guarantees that the record will not be lost as long as at least one synchronous replica server is still alive. This is the strongest guarantee, which is equivalent to the acks = -1 setting.
# All, -1, 0, 1
spring.kafka.producer.acks=1
# Consumer Configuration
The oldest message is read when the offset is reset
spring.kafka.consumer.auto-offset-reset=earliest
# Manual submission
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
Theme, partition, replica configuration
kafka.topics[0].name=topic1
kafka.topics[0].num-partitions=3
# Number of copies per partition, including leader copies. If set to 1, there are only leader copies and no follower copies
kafka.topics[0].replication-factor=1

kafka.topics[1].name=topic2
kafka.topics[1].num-partitions=1
kafka.topics[1].replication-factor=1


kafka.topics[2].name=topic3
kafka.topics[2].num-partitions=2
kafka.topics[2].replication-factor=1
# kafka configure end
Copy the code

Create theme and Kafka configuration classes

@Data
@Component
@ConfigurationProperties(prefix = "kafka")
public class TopicConfig {
    private List<Topic> topics;

    /** * static inner class, actually is the outer class, but the code location is written in a class */
    @Data
    public static class Topic {
        String name;
        Integer numPartitions = 3;
        Short replicationFactor = 1;

        public NewTopic toNewTopic(a) {
            return new NewTopic(this.name, this.numPartitions, this.replicationFactor); }}}@Slf4j
@Configuration
public class KafkaConfig {

    private final TopicConfig configurations;
    private final GenericWebApplicationContext context;

    public KafkaConfig(TopicConfig configurations, GenericWebApplicationContext genericContext) {
        this.configurations = configurations;
        this.context = genericContext;
    }


    /** * JSON message converter **@return* /
    @Bean
    public RecordMessageConverter jsonConverter(a) {
        return new StringJsonMessageConverter();
    }

    /** * Manually register NewTopic * after the bean is created@PostConstructMethod of modification */
    @PostConstruct
    public void init(a) { configurations.getTopics().forEach(item -> context.registerBean(item.name, NewTopic.class, item::toNewTopic)); }}Copy the code

Step3: the transmitted message entity

Custom entity class for transport.

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Book implements Serializable {
    private Long id;
    private String name;
}
Copy the code

Step4: The producer

You can look at this code first in Resources and then at this one. Spring provides kafkaTemplate to send messages.

@Slf4j
@Service
public class ProducerService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public ProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /** * Synchronously sends messages **@param topic
     * @param obj
     */
    @SneakyThrows
    public SendResult<String, Object> sendMessageSync(String topic, Object obj) {
        ProducerRecord producerRecord = new ProducerRecord(topic, obj);
        return this.sendMessageSync(producerRecord);
    }

    /** * Synchronously sends messages **@param producerRecord
     */
    @SneakyThrows
    public SendResult<String, Object> sendMessageSync(ProducerRecord producerRecord) {
        String topic = producerRecord.topic();
        Object obj = producerRecord.value();
        log.info("-- >> Producer sends message: subject --{}, message --{}", producerRecord.topic(), JSONObject.toJSONString(obj));
        SendResult<String, Object> sendResult = kafkaTemplate.send(topic, obj).get();
        RecordMetadata metadata = sendResult.getRecordMetadata();
        log.info("-- >> Producer receives send result: subject --{}, message --{}, partition --{}, offset --{}",
                topic, JSONObject.toJSONString(obj), metadata.partition(), metadata.offset());
        return sendResult;
    }

    /** * Send messages asynchronously **@param topic
     * @param obj
     */
    public void sendMessageAsync(String topic, Object obj) {
        ListenableFutureCallback<SendResult<String, Object>> callback = 
            						new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("-- >> Producer sent message: exception occurred", throwable);
            }

            @Override
            public void onSuccess(SendResult<String, Object> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                log.info("-- >> Producer receives send result: subject --{}, message --{}, partition --{}, offset --{}", topic, JSONObject.toJSONString(obj), metadata.partition(), metadata.offset()); }};this.sendMessageAsnc(topic, obj, callback);
    }

    /** * Send messages asynchronously **@param topic
     * @param obj
     */
    public void sendMessageAsnc(String topic, Object obj, ListenableFutureCallback futureCallback) {
        log.info("-- >> Producer sends message: subject --{}, message --{}", topic, JSONObject.toJSONString(obj)); ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj); future.addCallback(futureCallback); }}Copy the code

Step5: Consumers

By using the @kafKalistener annotation on the method to listen for messages, messages are poll-consumed as they come in.

@Slf4j
@Service
public class ConsumerService {
    @Value("${kafka.topics[0].name}")
    String myTopic;

    @Value("${kafka.topics[1].name}")
    String myTopic2;

    private final ObjectMapper objectMapper = new ObjectMapper();

    /** * by using it in a method@KafkaListenerAnnotations listen for messages and poll them down for consumption as they come in. * *@param record
     */
    @SneakyThrows
    @KafkaListener(topics = {"${kafka.topics[0].name}"}, groupId = "group1")
    public void consumeMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        Book book = objectMapper.readValue(record.value(), Book.class);
        log.info("-- >> Consumer reads message: topic--{}, partition --{}, message --{}", myTopic, record.partition(), book.toString());
        acknowledgment.acknowledge();
    }

    @SneakyThrows
    @KafkaListener(topics = {"${kafka.topics[1].name}"}, groupId = "group2")
    public void consumeMessage2(Book book, Acknowledgment acknowledgment) {
        log.info("-- >> Consumer reads message: topic--{}, message --{}", myTopic2, book.toString()); acknowledgment.acknowledge(); }}Copy the code

Step6: testing

@Component
public class KafkaTask {

    @Value("${kafka.topics[0].name}")
    String myTopic;

    @Value("${kafka.topics[1].name}")
    String myTopic2;

    @Autowired
    private ProducerService producerService;

    private AtomicLong atomicLong = new AtomicLong();

    @Scheduled(cron = "0/10 * * * * ? ")
    public void testKafka(a) {
        this.producerService.sendMessageAsync(myTopic, new Book(atomicLong.addAndGet(1), "Alice's Adventures" + atomicLong.get()));
        this.producerService.sendMessageAsync(myTopic2, new Book(atomicLong.addAndGet(1), "Crazy Rock"+ atomicLong.get())); }}Copy the code