Application scenarios of delay queues

1. When the distributed lock fails to lock, the message is put into a delay queue for processing

2. Order notification: Send SMS notification to users 60 seconds after successful order

3. In the order system, a user usually has 30 minutes to pay after placing an order at a certain point. If the payment is not successful within 30 minutes, the order needs to be closed

Redis implements the basic principle of delay queuing

The producer puts the data into the list queue (calling the delay function) and, crucially, makes use of redis’ Zset data structure

jedis.zadd(queueKey,System.currentTimeMillis()+10000,s);

The second parameter to zadd is the score value, which takes the current timestamp plus the number of seconds you want to delay as the score value, and the third parameter is the value

Consumers multithreaded polling Zset to get expired tasks to process

The Set values = jedis. ZrangeByScore (queueKey, 0, System. CurrentTimeMillis (), 0, 1);

ZrangeByScore (String key, double min, double Max, int offset, int count); zrangeByScore(String key, double min, double Max, int offset, int count) Count is 1, indicating consumption in ascending order of score size

Now, it occurs to me that queues in data structures consume in the order in which they are queued, but what if I want to consume in reverse order? Although I don’t know if this is the case in the project scenario…

After obtaining the data, call zREM function to remove the consumed data

jedis.zrem(queueKey,s)>0

The test code

import cn.hutool.core.lang.TypeReference; import com.alibaba.fastjson.JSON; import redis.clients.jedis.Jedis; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; public class RedisDelayingQueue<T> { static class TaskItem<T>{ public String id; public T msg; } // When generic types exist in fastjson serialized objects, TypeReference private Type TaskType=new TypeReference<TaskItem<T>>() {}.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey){ this.jedis=jedis; this.queueKey=queueKey; } public void delay(T msg){ TaskItem taskItem=new TaskItem(); taskItem.id= UUID.randomUUID().toString(); taskItem.msg=msg; System.out.println(" order "+ taskitem.msg); // Serialize the message to a String as the value of the zset, the expiration of the message as the score String s= json.tojsonString (taskItem); jedis.zadd(queueKey,System.currentTimeMillis()+10000,s); } public void loop(){while (!){public void loop(){while (! Thread. Interrupted ()) {Set values = jedis. ZrangeByScore (queueKey, 0, System. CurrentTimeMillis (), 0, 1); if(values.isEmpty()){ try{ Thread.sleep(500); }catch(InterruptedException e){ break; } continue; } String s= (String) values.iterator().next(); If (jedis.zrem(queueKey,s)>0){TaskItem TaskItem = jjson. ParseObject (s,TaskType); System.out.println(" order of consumption "+ taskitem.msg); this.handleMsg((T) taskItem.msg); Public void handleMsg(T MSG){system.out.println (MSG); } public static void main(String[] args) {system.out.println (" run "); Jedis Jedis = new Jedis (" 127.0.0.1 ", 6379); RedisDelayingQueue queue=new RedisDelayingQueue<>(jedis,"q-demo"); Thread produce=new Thread(){ public void run(){ for(int i=0; i<10; i++){ queue.delay("codehole"+i); }}}; Thread consumer=new Thread(){ public void run(){ for(int i=0; i<10; i++){ queue.loop(); }}}; produce.start(); consumer.start(); try{ produce.join(); // Thread.sleep(6000); // consumer.interrupt(); consumer.join(); }catch (InterruptedException e){ } } }Copy the code