Kafka has strong message stacking capability and can accumulate hundreds of millions of messages. It is especially suitable for log processing scenarios where real-time requirements are not too high, and supports cluster deployment. Compared with Redis, Kafka has higher stacking capability and reliability

The complete project code has been uploaded to github: github.com/neatlife/my…

You can quickly get started with Kafka by following these steps

Get a usable kafka instance

You can use docker to start a Kafka cluster with one click, see github.com/simplesteph…

git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d
Copy the code

The operation effect is as follows

Use the docker-comement-f full-stack.yml ps command to get the ports that Kafka can listen on

Note down the address that Kafka listens on, 9092, which will be used later

Port 8000 is the UI interface for kafka’s topics. This interface allows you to view the current topic list as follows

See also the data saved in topic

Prepare case projects

You can create test projects at https://start.spring.io/

  1. spring-boot-starter-web
  2. spring-kafka
  3. lombok

In appliation.properties, configure kafka’s address and the group-ID to use. The group-ID name can be customized, such as myConsumerGroup

Spring. Kafka. The bootstrap - the servers = 127.0.0.1:9092 spring. Kafka. Consumer. The group - id = myconsumergroupCopy the code

Send messages using the Kafka client

Use a Spring Boot service to encapsulate kafka’s message sending code. The core code is as follows

package mykafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private final KafkaTemplate<String, String> kafkaTemplate;


    private String topic = "Custom topic";

    Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String message) {
        this.kafkaTemplate.send(topic, message);
        System.out.println("Sent sample message [" + message + "] to "+ topic); }}Copy the code

Then write an interface to call the service that sends kafka messages. The core code is as follows:

@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {

    private final Producer producer;

    @RequestMapping("/test1")
    public String test1(a) {
        producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
        return "test1"; }}Copy the code

Note: Kafka topics used in the above code can be customized, such as myTopic

Then access the interface in your browser at IP :8080/test1

You can see that the message has been sent to Kafka

News consumption

To consume a message, simply add KafkaListener to the method and specify topic and groupId

The core code is as follows

@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                           @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    log.info(
            "received message, topic: {}, partition: {}, offset: {}, message: {}",
            topics.get(0),
            partitions.get(0),
            offsets.get(0),
            message
    );
}
Copy the code

The operation effect is as follows:

Other clients

PHP send and consume client reference: github.com/arnaud-lb/p…

Go client reference: github.com/confluentin…

A couple of points to note

Sending and consuming messages need to ensure that the topic is consistent

Logs can be sent to Kafka for buffering, and then messages can be retrieved by Kafka’s client and stored in elK and other log storage systems for analysis and visualization

Because kafka clients send messages asynchronously and servers save messages to disk asynchronously, messages can be lost if the server is down. If reliability is high, you can use an improved version of Kafka: RocketMQ

Refer to the link

  1. www.baeldung.com/spring-kafk…
  2. www.baeldung.com/spring-inje…
  3. Docs. Confluent. IO/current/cli…