Delay blocking queue DelayQueue

DelayQueue is a blocking queue that supports Delayed acquisition of elements. The internal PriorityQueue PriorityQueue is used to store elements, and elements must implement Delayed interface. You can specify when you create an element how long it takes to get the current element from the queue, and only when the delay expires.

Usage scenarios

Due to the nature of deferred blocking queues, we typically apply DelayQueue to the following scenarios:

  • Cache system: When elements can be fetched from DelayQueue, the cache is said to have expired
  • Scheduled task scheduling:

Let’s take a look at the use of DelayQueue in the cache system, the code is as follows:

public class DelayQueueDemo {

    static class Cache implements Runnable {

        private boolean stop = false;

        private Map<String, String> itemMap = new HashMap<>();

        private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();

        public Cache (a) {
            // Enable internal thread detection for expiration
            new Thread(this).start();
        }

        /** * Add cache **@param key
         * @param value
         * @paramexprieTime&emsp; Expiration time, in seconds */
        public void put (String key, String value, long exprieTime) {
            CacheItem cacheItem = new CacheItem(key, exprieTime);

            // Ignore the process of adding duplicate keys here
            delayQueue.add(cacheItem);
            itemMap.put(key, value);
        }

        public String get (String key) {
            return itemMap.get(key);
        }

        public void shutdown (a) {
            stop = true;
        }

        @Override
        public void run(a) {
            while(! stop) { CacheItem cacheItem = delayQueue.poll();if(cacheItem ! =null) {
                    // The element expires, removed from the cache
                    itemMap.remove(cacheItem.getKey());
                    System.out.println("key : " + cacheItem.getKey() + "Expired and removed");
                }
            }

            System.out.println("cache stop"); }}static class CacheItem implements Delayed {

        private String key;

        /** * Expiration time (in seconds) */
        private long exprieTime;

        private long currentTime;

        public CacheItem(String key, long exprieTime) {
            this.key = key;
            this.exprieTime = exprieTime;
            this.currentTime = System.currentTimeMillis();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            // Calculate the remaining expiration time
            // If the value is greater than 0, it is not expired
            return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime);
        }

        @Override
        public int compareTo(Delayed o) {
            // Long expiration time is placed at the end of the queue
            if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) {
                return 1;
            }
            // The ones with short expiration times are placed in the queue header
            if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                return -1;
            }

            return 0;
        }

        public String getKey(a) {
            returnkey; }}public static void main(String[] args) throws InterruptedException {

        Cache cache = new Cache();

        // Add cache elements
        cache.put("a"."1".5);
        cache.put("b"."2".4);
        cache.put("c"."3".3);

        while (true) {
            String a = cache.get("a");
            String b = cache.get("b");
            String c = cache.get("c");

            System.out.println("a : " + a + ", b : " + b + ", c : " + c);

            // All elements expire and exit the loop
            if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) {
                break;
            }

            TimeUnit.MILLISECONDS.sleep(1000); } cache.shutdown(); }}Copy the code

The result is as follows:


a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3Key: c expires and removes a:1, b : 2, c : nullKey: b expires and removes A:1, b : null, c : nullKey: a expires and removes a:null, b : null, c : null
cache stop

Copy the code

As can be seen from the execution result, element C expires and is cleared from the cache after waiting for 3 seconds, element B expires and is cleared from the cache after waiting for 4 seconds, and element A expires and is cleared from the cache after waiting for 5 seconds.

Realize the principle of

variable

Reentrant lock
private final transient ReentrantLock lock = new ReentrantLock();
Copy the code

Used to ensure thread-safety of queue operations

Priority queue
private final PriorityQueue<E> q = new PriorityQueue<E>();
Copy the code

Storage medium, used to ensure that the execution with low latency takes precedence

leader

The leader points to the first thread that gets an element from the queue and blocks the wait. Its role is to reduce unnecessary waiting time for other threads. (I have never figured out how to reduce the wait time of other threads.)

condition
private final Condition available = lock.newCondition();
Copy the code

Condition object that is notified when a new element arrives, or when a new thread may need to become the leader

The following will mainly analyze the queue entry and queue exit actions:

Team – offer
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            / / team
            q.offer(e);
            if (q.peek() == e) {
                // If the queued element is at the head of the queue, the current element has the smallest delay
                // Empty the leader
                leader = null;
                // Wake up the thread blocking the waiting queue
                available.signal();
            }
            return true;
        } finally{ lock.unlock(); }}Copy the code
Out of the team – poll
public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                	// Wait for add to wake up
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                    	// Return to the queue head node
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if(leader ! =null)
                    	// If the leader is not empty
                    	// Another thread has called the take operation
                    	// the current calling thread follower hangs and waits
                        available.await();
                    else {
                    	// If leader is empty
                    	// Point the leader to the current thread
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                        	// The current calling thread is pending within the specified delay
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null&& q.peek() ! =null)
                // Wake up the follower after the leader finishes processingavailable.signal(); lock.unlock(); }}Copy the code
Leader – followers mode

This figure is quoted from CSDN “Introduction to Leader/Follower Multi-threaded Network Model”.

summary

From the DelayQueue implementation, we can see why PriorityQueue has a small top heap.