The background,

Here’s a quick note on SpringBoot and Kafka integration.

Two, implementation steps

1. Introduce jar packages

<dependency>
   <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
Copy the code

2. Write producer and consumer configurations

3. Producer allocation

spring.application.name=kafka-springboot
#Configure the addresses of kafka servers, separated by commas (,)
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
#Producer allocation
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
Copy the code

4. Consumer configuration

#Consumer configuration
#Disable automatic ack submission
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#Configure listening to manually submit ACK, consume a piece of data, immediately commit
spring.kafka.listener.ack-mode=manual_immediate
#Was also tested batch submit an ack, when use spring. Kafka. Consumer. The Max - poll - records so much data, submit
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.poll-timeout=500S
Copy the code

5. The consumer submits ack manually

1, spring. Kafka. Consumer. Enable – auto – commit modified into false 2, spring. Kafka. Listener. Ack – modified into | – manual mode: Said manual submission, but test to find is the bulk submit | – manual_immediate: said manual submitted, when calling Acknowledgment# acknowledge immediately after submission.

3. Write producer code

@Component
public class KafkaProducer implements CommandLineRunner {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public void run(String... args) {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->
                {
                    kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis()))
                            .addCallback(new SuccessCallback<SendResult<String, String>>() {
                                @Override
                                public void onSuccess(SendResult<String, String> result) {
                                    if (null! = result.getRecordMetadata()) { System.out.println("Consumption sent successfully offset:" + result.getRecordMetadata().offset());
                                        return;
                                    }
                                    System.out.println("Message sent successfully"); }},new FailureCallback() {
                                @Override
                                public void onFailure(Throwable throwable) {
                                    System.out.println("Consumption sending failed :"+ throwable.getMessage()); }}); },0.1, TimeUnit.SECONDS); }}Copy the code

1. Send the consumption using KafkaTemplate. 2. According to the sending result, the message is sent successfully or failed.

4. Write consumer code

@Component
public class KafkaConsumer {

    @KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001")
    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + Partition: kafka message received + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
        TimeUnit.SECONDS.sleep(1); ack.acknowledge(); }}Copy the code

KafkaListener: topic: indicates the name of the queue to be listened on. GroupId: indicates the ID of the consumer group

3. Operation results

Iv. Reference documents

1, the docs. Spring. IO/spring – the boot…

5. Code path

Gitee.com/huan1993/ra…