Implementation of delay queue based on Kafka

Kafka is widely used as a message queue, a lot of people will be familiar, but when you search on the net “kafka delay queue”, there are some round on time or just provide some train of thought, is not a real available code implementation, today we’re going to break this phenomenon, to provide a working code, the topic, Attract more gods to share.

How to implement delay queue based on Kafka?

To solve a problem, we need to break it down. Kafka is a high performance message queue. As long as the consumption capacity is sufficient, the message will be received immediately. Therefore, we need to find a way to delay the message sending.

Some online gurus have offered the following solutions:

  1. Delayed messages are sent not directly to the target topic, but to a topic used to process delayed messages, for exampledelay-minutes-1
  2. Write some code to pulldelay-minutes-1To send the messages that meet the criteria to the actual target topic.

It’s like drawing a horse.

The plan is good, but we need more details.

Perfect the details

What’s the problem?

The problem is that the code receives the delayed message immediately after it is sent, and how to handle it so that the delayed message is sent to the real topic for some time.

Some students may think it is very simple. After receiving the message, the code program determines that the condition is not satisfied, and then calls the sleep method. After a period of time, I will carry on the next loop to pull the message.

Is it really possible?

Everything seems wonderful, but it’s not feasible.

This is because when polling kafka for pull messages, it returns a batch of messages specified by the max.poll.records configuration, but if the program code fails to process these messages within the expected time specified by the max.poll.interval.ms configuration, Kafka assumes that the consumer has died. Rebalance, and you won’t be able to pull any messages as a consumer.

For example, if you need a 24-hour delay message queue, write thread. sleep(1000*60*60*24) in the code; Change max.poll.interval. Ms to 1000*60*60*24 to avoid making this rebalance. This might make you feel weird. Where I am? Why did I write this code?

We could have done it more gracefully.

KafkaConsumer provides a pause and resume API that calls the consumer’s pause method and stops pulling new messages. Kafka does not assume that the consumer has died if it has not been consumed for a long time. In addition, to be more elegant, we will start a timer instead of sleep. The complete process is shown in the figure below. When the consumer finds that the message does not meet the conditions, we suspend the consumer and seek the offset to the position of the last consumption so as to wait for the next cycle to consume the message again.

Java code implementation

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;

@SpringBootTest
public class DelayQueueTest {

    private KafkaConsumer<String, String> consumer;
    private KafkaProducer<String, String> producer;
    private volatile Boolean exit = false;
    private final Object lock = new Object();
    private final String servers = "";

    @BeforeEach
    void initConsumer(a) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
        consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
    }

    @BeforeEach
    void initProducer(a) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<>(props);
    }

    @Test
    void testDelayQueue(a) throws JsonProcessingException, InterruptedException {
        String topic = "delay-minutes-1";
        List<String> topics = Collections.singletonList(topic);
        consumer.subscribe(topics);

        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run(a) {
                synchronized(lock) { consumer.resume(consumer.paused()); lock.notify(); }}},0.1000);

        do {

            synchronized (lock) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));

                if (consumerRecords.isEmpty()) {
                    lock.wait();
                    continue;
                }

                boolean timed = false;
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    long timestamp = consumerRecord.timestamp();
                    TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                    if (timestamp + 60 * 1000 < System.currentTimeMillis()) {

                        String value = consumerRecord.value();
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode jsonNode = objectMapper.readTree(value);
                        JsonNode jsonNodeTopic = jsonNode.get("topic");

                        String appTopic = null, appKey = null, appValue = null;

                        if(jsonNodeTopic ! =null) {
                            appTopic = jsonNodeTopic.asText();
                        }
                        if (appTopic == null) {
                            continue;
                        }
                        JsonNode jsonNodeKey = jsonNode.get("key");
                        if(jsonNodeKey ! =null) {
                            appKey = jsonNode.asText();
                        }

                        JsonNode jsonNodeValue = jsonNode.get("value");
                        if(jsonNodeValue ! =null) {
                            appValue = jsonNodeValue.asText();
                        }
                        // send to application topic
                        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
                        try {
                            producer.send(producerRecord).get();
                            // success. commit message
                            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
                            HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
                            metadataHashMap.put(topicPartition, offsetAndMetadata);
                            consumer.commitSync(metadataHashMap);
                        } catch (ExecutionException e) {
                            consumer.pause(Collections.singletonList(topicPartition));
                            consumer.seek(topicPartition, consumerRecord.offset());
                            timed = true;
                            break; }}else {
                        consumer.pause(Collections.singletonList(topicPartition));
                        consumer.seek(topicPartition, consumerRecord.offset());
                        timed = true;
                        break; }}if(timed) { lock.wait(); }}}while (!exit);

    }
}
Copy the code

This program is a unit test based on SpringBoot 2.4.4 and Kafka-Client 2.7.0. It needs to change the private variable servers to the address of the Kafka broker.

After starting the program, send json string data to Topic delay-mint-1 in the following format

{
    "topic": "target"."key": "key1"."value": "value1"
}
Copy the code

Start a consumer listening Topic target at the same time, and after a minute it will receive a key=”key1″, value=”value1″ data.

Source code address

What else needs to be done?

Delay-minutes-1 delay-minutes-5 delay-minutes-10 delay-minutes-15 to provide an exponential delay than a single topic. After all, when a message is pulled sequentially, one message does not meet the condition, and all the others will be queued.