No more words, just get started.

Install kafka locally and run it.

First open the kafka kafka.apache.org/downloads official download address, download the kafka to local. For example, I used SpringBoot2.2.2, downloaded 2.3.1, and used Spring-Kafka 2.3.4. (Do not download source zip files with SRC.)

Kafka: D:\ devTools \kafka_2.12-2.3.1

Type the command: \ bin \ Windows \ zookeeper server - start bat. \ config \ zookeeper properties

If zooKeeper runs successfully, enter the.\bin\ Windows \kafka-server-start.bat.\config\server.properties command in the same path

In this case, the Kafka service is successfully started.

Springboot integrates Kafka.

1. The pom

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>
Copy the code

My spring-kafka defaults to 2.3.4release.

Springboot is 2.2.2release

2. Configuration and code

There are two configuration methods, one is the Application configuration file configuration in SpringBoot, the other is the Java code configuration. Choose one or the other.

(1). Configuration file configuration

Server: port: 28099 # Kafka server: port: 28099 # Kafka server: port: 28099 # Kafka server: port: 28099 # Kafka server: port: 28099 # Kafka server: port: 28099 # Kafka server: port: 28099 # Kafka Retries: 0 Number of messages to be sent in batches Batch-size: 16384 Buffer-memory: 33554432 Key/serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: Org.apache.kafka.com mon. Serialization. StringSerializer consumer: # specify a default group, a consumer group id - the id: consumer-tutorial auto-commit-interval: 100 auto-offset-reset: earliest enable-auto-commit: True # specified message key and the message body way of decoding key - deserializer: org.apache.kafka.com mon. Serialization. StringDeserializer value - deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer # specified number of threads in the listener container, used to improve the concurrency value listener: missing - switchable viewer - fatal: false concurrency: 3Copy the code

(2). Java configuration

package com.xuegao.kafkaproject.config; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @author xg * @Date: 2021/10/21 19:22 */ @Configuration @EnableKafka public class KafkaConfig { @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); Props. The put (ConsumerConfig BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-topic-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ProducerFactory<String, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); Props. The put (ProducerConfig BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<String, Object>(producerFactory()); } @Bean public NewTopic initialSendMsgTopic() { return new NewTopic("kafka.topic",8, (short) 2 ); } @Bean public NewTopic topic1() { return TopicBuilder.name("my-topic") .partitions(10) .replicas(3) .compact() .build(); }}Copy the code

Note: for topics that are not registered as beans in Java configuration, they need to be registered in the configuration file, as shown above, otherwise an error will be reported.

Producer and consumer methods

@Resource private KafkaTemplate<String, Object> kafkaTemplate; Public void send(T obj) {String jsonObj = json.tojsonString (obj); logger.info("------------ message = {}", jsonObj); ListenableFuture<SendResult<String, Object>> Future = kafkatemplate. send("kafka. Topic ", jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { logger.info("Produce: The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Logger. info("Produce: The message was sent successfully:"); logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString()); }}); }Copy the code
/** * listen to kafka. Tut topic, @param record @param Topic */ @kafkalistener (id = "tutest", topics = "kafka.topic") public void listen(ConsumerRecord<? ,? > record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional<? > kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); Logger. The info (" the Receive: + + + + + + + + + + + + + + + Topic: "+ Topic); Logger. The info (" the Receive: + + + + + + + + + + + + + + + Record: "+ Record); Logger. The info (" the Receive: + + + + + + + + + + + + + + + Message: "+ Message); }}Copy the code

Finally, write a test controller that invokes the producer method using the web access interface. After successful access, the console will display a log printed by the consumer.

Wish you good luck, abnormal baidu can.