This is the first day of my participation in the First Challenge 2022

preface

  • DelayQueue DelayQueue DelayQueue DelayQueue DelayQueue
  • Deferred tasks in real distributed projects do not typically use the JDK’s built-in delay queue because it is based on JVM memory storage and does not persist, so the task will be lost when the service is restarted.
  • In the project can use MQ dead letter queue or Redisson delay queue for processing delay tasks, this article will talk about the use of redisson delay queue demo and its execution source code.

The demo sample

  • Create a simple SpringBoot project from scaffolding, introduce Redisson’s Maven dependencies, and simply configure the Redisson connection properties.
<! --> <dependency> <groupId>org.redisson</groupId> <artifactId> <version>3.16.6</version> </dependency> @configuration public class RedissonConfig {@value ("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; /** * Obtain redissonClient instance ** @return * @throws Exception */ @bean public redissonClient getRedisson() {Config Config = new Config(); String address = "redis://" + host + ":" + port; config.useSingleServer().setAddress(address); return Redisson.create(config); }}Copy the code
  • RedissonQueueHandle defines a Redisson delay queue insertion and fetch task processing class that controls spring’s bean load cycle to enable independent threads to fetch delayed tasks.
  • Here, three methods are used to obtain the delayed task. Except for the first blocking method, the other two methods are not percentages to obtain the task according to the delay parameter, because the delayed task is obtained periodically at a time interval.
/** * Component public class RedissonQueueHandle implements InitializingBean ** @author ZRH */ @slf4j @Component Public class RedissonQueueHandle implements InitializingBean { private final RBlockingQueue<RedisDataEntity<? >> queue; private final RDelayedQueue<RedisDataEntity<? >> delayedQueue; public RedissonQueueHandle (RedissonClient client) { this.queue = client.getBlockingQueue("redisson:queue"); this.delayedQueue = client.getDelayedQueue(queue); } @override public void afterPropertiesSet () {thread(); // watchDog(new HashedWheelTimer()); // schedule(); } private void thread () { new Thread(() -> { while (true) { try { RedisDataEntity entity = queue.take(); Log.info (" Entity: {}, time: {}", entity, system.currentTimemillis () -entity.getTime ()); log.info(" Entity: {}, time: {}", entity, system.currentTimemillis () -entity.getTime ()); } catch (Exception e) { } } }, "zrh").start(); } private void watchDog (final HashedWheelTimer timer) { timer.newTimeout(timeout -> { RedisDataEntity entity = queue.poll(); if (null ! = entity) {log.info(" entity, system.currentTimemillis () -entity.getTime ()); } watchDog(timer); }, 3, TimeUnit.SECONDS); } private void schedule () { Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> { RedisDataEntity entity =  queue.poll(); if (null ! = entity) {log.info(" entity, system.currentTimemillis () -entity.getTime ()); } }, 5, 5, TimeUnit.SECONDS); } /** * add redis, * * @param Entity */ public void offer(RedisDataEntity entity) {try {delayedQueue.offer(entity, entity.getExpire(), TimeUnit.MILLISECONDS); } catch (Exception e) {log.error(" redis delay queue Exception ", e); }}}Copy the code
  • The redisson delay queue can be a string or an object RedisDataEntity. Since IO disk storage is performed, the Serializable interface must be implemented.
/** * @Author: ZRH * @Date: 2022/1/10 11:54 */ @data Public class RedisDataEntity<T> implements Serializable {/** * private final T Data; /** * Expire time (in milliseconds) */ private final Long expire; /** * private final Long time; public RedisDataEntity (T data, Long expire, Long time) { this.data = data; this.expire = expire; this.time = time; }}Copy the code
  • Then open an insert data interface:
/** * @Author: ZRH * @Date: 2022/1/10 11:45 */ @Slf4j @RestController public class IndexController { private final RedissonQueueHandle redisHandle; public IndexController (RedissonQueueHandle redisHandle) { this.redisHandle = redisHandle; } @PostMapping("redissonQueue") public String redissonQueue (@RequestParam String data, @RequestParam Long expire) { RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis()); Log.info (" Add data this time: {}", entity); redisHandle.offer(entity); return "ok"; }} access interface setup delay 30 seconds: http://localhost:8802/redissonQueue? Data = a&expire = 30000, Print results following the 2022-01-14 14:21:52. 140 INFO - 10808 [nio - 8802 - exec - 1] C.R.W eb. Controller. IndexController: Add data this time: RedisDataEntity(data=a, expire=30000, Time = 1642141312135), the 2022-01-14 14:21:52. 887 INFO - 10808 [nio - 8802 - exec - 2] C.R.W eb. Controller. IndexController: Add data this time: RedisDataEntity(data=a, expire=30000, Time = 1642141312887) 14:22:22 2022-01-14. 10808-240 the INFO/ZRH C.R.W eb. Redis. RedissonQueueHandle: Obtain data this time: RedisDataEntity(data= A, expire=30000, time=1642141312135) 30105 the 2022-01-14 14:22:22. 10808-914 the INFO/ZRH C.R.W eb. Redis. RedissonQueueHandle: Obtain data this time: RedisDataEntity(Data = A, EXPIRE =30000, time=1642141312887), time: 30027Copy the code

Source code analysis of the initial execution process

  • The Redisson delay queue ultimately interacts with the Redis service, so you can use the monitor command to see what commands are executed in Redis, which is very helpful to understand the execution process.

  • The picture above shows several instructions sent to Redis at project startup
    • “SUBSCRIBE” : redisson_delay_queue_channel:{redisson:queue}”, in which a scheduled task obtains data from the queue
    • “Zrangebyscore” : retrieves the first 100 elements in the set “redisson_delay_queue_timeout:{redisson:queue}” whose score is sorted between 0 and 1642148406748 (current timestamp)
    • “Zrange” : gets the first element in the “redisson_delay_queue_timeout:{redisson:queue}” set, used to get the expiration time of the next element
    • “BLPOP” : Retrieves and removes the first element in the “redisson: Queue “list, waiting to block if there is no element. So this is going to be blocked
    • “Rpush” : If the directive “zrangebyScore” gets an element, it pushes it to redisson:queue
    • “Lrem” : if the directive “zrangebyscore” gets an element, it deletes the first element in the queue “redisson_delay_queue:{redisson:queue} whose element is v
The SUBSCRIBE instruction
  • Enter the constructor of RedissonDelayedQueue, which contains the lua script to execute the above instructions (some of the code has been deleted to avoid space, the same below) :
. protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); ChannelName = prefixName("redisson_delay_queue_channel", getRawName()); QueueName = prefixName("redisson_delay_queue", getRawName()); TimeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName()); timeoutSetName = prefixName("redisson_delay_queue_timeout"); QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;" , Arrays.<Object>asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); }}; queueTransferService.schedule(queueName, task); this.queueTransferService = queueTransferService; }Copy the code
  • Continue to follow up queueTransferService. The schedule (queueName, task) method, for the first time into the tasks set, so the final execution start () method:
. private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>(); public synchronized void schedule(String name, QueueTransferTask task) { QueueTransferTask oldTask = tasks.putIfAbsent(name, task); if (oldTask == null) { task.start(); } else { oldTask.incUsage(); }}Copy the code
  • Enter the QueueTransferTask, continue to follow up schedulerTopic. AddListener (…). Methods:
. private int messageListenerId; private int statusListenerId; public void start() { RTopic schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); }}); messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); }}); }Copy the code
  • Then will enter PublishSubscribeService. Subscribe (…). Methods:
  • Note: here we continue calling the overloaded method subscribe(…) SUBSCRIBE is set to pubsubtype.subscribe
. public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<? >... listeners) { return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), listeners); } private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, RedisPubSubListener<? >... listeners) { RPromise<PubSubConnectionEntry> promise = new RedissonPromise<>(); AsyncSemaphore lock = getSemaphore(channelName); Lock.acquire (() -> {if (promise.isdone ()) {lock.release(); return; } subscribe(codec, channelName, entry, promise, type, lock, listeners); }); return promise; }Copy the code
  • Acquire (…) of AsyncSemaphore objects Methods place thread tasks on listeners’ queues and then read and execute them in turn.
public class AsyncSemaphore { private final AtomicInteger counter; private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>(); public void acquire(Runnable listener) { listeners.add(listener); tryRun(); } private void tryRun() { if (counter.decrementAndGet() >= 0) { Runnable listener = listeners.poll(); if (listener == null) { counter.incrementAndGet(); return; } listener.run(); } else { if (counter.incrementAndGet() > 0) { tryRun(); }}}}Copy the code
  • Subscribe (Codec, channelName, Entry, promise, type, lock, listeners)
. private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<? >... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry)); if (connEntry ! = null) { addListeners(channelName, promise, type, lock, connEntry, listeners); return; } freePubSubLock.acquire(() -> { if (promise.isDone()) { lock.release(); freePubSubLock.release(); return; } MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry); // The entry2PubSubConnection collection is null the first time you enter it, so use the default value, Finally freeEntry = = null PubSubEntry freePubSubConnections = entry2PubSubConnection. GetOrDefault (msEntry, new PubSubEntry()); PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek(); if (freeEntry == null) { freePubSubLock.release(); connect(codec, channelName, msEntry, promise, type, lock, listeners); return; }... }); }Copy the code
  • Methods connect(Codec, channelName, msEntry, Promise, Type, Lock, Listeners) :
. private void connect(Codec codec, ChannelName channelName, MasterSlaveEntry msEntry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<? >... listeners) { RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(msEntry, channelName); promise.onComplete((res, e) -> {... }); connFuture.onComplete((conn, ex) -> { if (ex ! = null) {... } freePubSubLock.acquire(() -> { PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); int remainFreeAmount = entry.tryAcquire(); PubSubKey key = new PubSubKey(channelName, msEntry); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry); if (oldEntry ! = null) {... } if (remainFreeAmount > 0) { addFreeConnectionEntry(channelName, entry); } freePubSubLock.release(); RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners); ChannelFuture future; Else logic if (pubsubtype.psubscribe == type) {future = entry.psubscribe(codec, channelName); } else { future = entry.subscribe(codec, channelName); } future.addListener((ChannelFutureListener) future1 -> { if (! future1.isSuccess()) {... } connectionManager.newTimeout(timeout -> subscribeFuture.cancel(false), config.getTimeout(), TimeUnit.MILLISECONDS); }); }); }); }Copy the code
  • The method of contents of the regional expression, basically see method entry. The subscribe (codec, channelName), finally into RedisPubSubConnection. Async (…). Method, which is the process of sending a SUBSCRIBE instruction:

Zrangebyscore and Zrange commands
  • After the SUBSCRIBE instruction is issued, the listener added to the queuetransferTask.start () method is triggered and pushTask() is executed.
  • When the pushTaskAsync() method is finished (lua script is finished), a scheduled task scheduleTask() is started.
. protected abstract RTopic getTopic(); protected abstract RFuture<Long> pushTaskAsync(); Private void pushTask() {// This abstract method is implemented in the constructor of RedissonDelayedQueue, RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.onComplete((res, e) -> { if (e ! = null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res ! = null) { scheduleTask(res); }}); }Copy the code
BLPOP instruction
  • When RedissonDelayedQueue delay queue structure is completed, will be called delay queue take () method to obtain delay task, and then will enter RedissonBlockingQueue. TakeAsync () method:
. @Override public RFuture<V> takeAsync() { return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0); } /* * (non-Javadoc) * @see java.util.concurrent.BlockingQueue#take() */ @Override public V take() throws InterruptedException { return commandExecutor.getInterrupted(takeAsync()); }...Copy the code
  • Note that the value of the parameter is BLPOP, which is clearly related to the BLPOP directive we are looking for, so the client is blocking the BLPOP directive to get the value. Open a thread on the client side to keep blocking the loop to get elements;
  • Look at the source code to continue downward into CommandAsyncService. WriteAsync (…). Method, and then continue down to the redisexecutor.execute () method:
. public void execute() { if (mainPromise.isCancelled()) {... } if (! connectionManager.getShutdownLatch().acquire()) {... } codec = getCodec(codec); RFuture<RedisConnection> connectionFuture = getConnection(); RPromise<R> attemptPromise = new RedissonPromise<>(); mainPromiseListener = (r, e) -> {... }; if (attempt == 0) {... } scheduleRetryTimeout(connectionFuture, attemptPromise); connectionFuture.onComplete((connection, e) -> { if (connectionFuture.isCancelled()) {... } if (! connectionFuture.isSuccess()) {... } // Execute the current method sendCommand(attemptPromise, connection) upon successful connection retrieval; writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { checkWriteFuture(writeFuture, attemptPromise, connection); }}); }); attemptPromise.onComplete((r, e) -> {... }); }Copy the code
  • Some branch methods in this method do not press the table. There is a timeout retry mechanism in the middle, using netty’s time wheel.
  • Retrieves the write operation connection object task, and then enters the method sendCommand(attemptPromise, Connection) to send instructions
  • Command: “BLPOP”, parameter: “redisson: Queue “”0”

Offer adds task flow source analysis

  • After the project is started, add a deferred task to Redis and check the command executed in Redis:

  • Then follow up insert elements offer method, enter RedissonDelayedQueue. OfferAsync () method, as shown below:
. @Override public void offer(V e, long delay, TimeUnit timeUnit) { get(offerAsync(e, delay, timeUnit)); } @Override public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;" , Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); }Copy the code
  • It is obvious that a long list of script commands are executed in Redis. The basic flow is relatively simple:
    • “Zadd” : This adds element data to the zset set “redisson_delay_queue_timeout:{redisson:queue}” (this data has been processed, regardless of its structure). The sort value is the current timestamp + delay time
    • “Rpush” : push element data to list queue” redisson:queue”
    • “Zrange” : Gets the first sorted element of the zset set “redisson_delay_queue_timeout:{redisson:queue}”
    • “Publish” : If the element above is the element that was inserted this time, the notification queue “redisson_delay_queue_channel:{redisson:queue}” is published with the expiration time of the current element. This is done to reduce the time difference between the expiration time of this element.

Finally timer source code analysis

  • A timer task executes pushTask() and scheduleTask(…) when a listener listens for a new client subscription or element notification. Methods:
. private int messageListenerId; private int statusListenerId; public void start() { RTopic schedulerTopic = getTopic(); // When a new client subscribes to a schedulerTopic, Is the trigger to perform pushTask statusListenerId = schedulerTopic () method. The addListener (new BaseStatusListener () {@ Override public void onSubscribe(String channel) { pushTask(); }}); // When redis is notified of a new message, the scheduleTask(...) is triggered. Method, StartTime for publish notice of the elements in the expiration time messageListenerId = schedulerTopic. AddListener (Long class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); }}); }Copy the code
  • The pushTask() method operates on the Redis delay queue, scheduleTask(…) It is the Netty timewheel that controls the call to pushTask(), so pushTask() and scheduleTask() call each other.
    ......
    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {...}
        
        if (oldTimeout != null) {...}
        
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }
    
    protected abstract RTopic getTopic();
    
    protected abstract RFuture<Long> pushTaskAsync();
    
    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            
            if (res != null) {
                scheduleTask(res);
            }
        });
    }
Copy the code
  • Conclusion:
    • When a new client subscribs, the pushTask() method is called to pull the data into the blocking queue.
    • ScheduleTask (…) is called when a message with a letter is published. Method and determine whether to delay the call through a time wheel or call the pushTask() method immediately based on its expiration time.

The last

  • The source code of redisson delay queue is relatively abstract and complex, and it feels easier to parse than its distributed lock.
  • But look carefully and follow the main methods to understand their implementation. Learn with an open mind and make progress together