Delay blocking queue DelayQueue
DelayQueue is a blocking queue that supports Delayed acquisition of elements. The internal PriorityQueue PriorityQueue is used to store elements, and elements must implement Delayed interface. You can specify when you create an element how long it takes to get the current element from the queue, and only when the delay expires.
Usage scenarios
Due to the nature of deferred blocking queues, we typically apply DelayQueue to the following scenarios:
- Cache system: When elements can be fetched from DelayQueue, the cache is said to have expired
- Scheduled task scheduling:
Let’s take a look at the use of DelayQueue in the cache system, the code is as follows:
public class DelayQueueDemo {
static class Cache implements Runnable {
private boolean stop = false;
private Map<String, String> itemMap = new HashMap<>();
private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();
public Cache (a) {
// Enable internal thread detection for expiration
new Thread(this).start();
}
/** * Add cache **@param key
* @param value
* @paramexprieTime  Expiration time, in seconds */
public void put (String key, String value, long exprieTime) {
CacheItem cacheItem = new CacheItem(key, exprieTime);
// Ignore the process of adding duplicate keys here
delayQueue.add(cacheItem);
itemMap.put(key, value);
}
public String get (String key) {
return itemMap.get(key);
}
public void shutdown (a) {
stop = true;
}
@Override
public void run(a) {
while(! stop) { CacheItem cacheItem = delayQueue.poll();if(cacheItem ! =null) {
// The element expires, removed from the cache
itemMap.remove(cacheItem.getKey());
System.out.println("key : " + cacheItem.getKey() + "Expired and removed");
}
}
System.out.println("cache stop"); }}static class CacheItem implements Delayed {
private String key;
/** * Expiration time (in seconds) */
private long exprieTime;
private long currentTime;
public CacheItem(String key, long exprieTime) {
this.key = key;
this.exprieTime = exprieTime;
this.currentTime = System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
// Calculate the remaining expiration time
// If the value is greater than 0, it is not expired
return exprieTime - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTime);
}
@Override
public int compareTo(Delayed o) {
// Long expiration time is placed at the end of the queue
if (this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS)) {
return 1;
}
// The ones with short expiration times are placed in the queue header
if (this.getDelay(TimeUnit.MICROSECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
return -1;
}
return 0;
}
public String getKey(a) {
returnkey; }}public static void main(String[] args) throws InterruptedException {
Cache cache = new Cache();
// Add cache elements
cache.put("a"."1".5);
cache.put("b"."2".4);
cache.put("c"."3".3);
while (true) {
String a = cache.get("a");
String b = cache.get("b");
String c = cache.get("c");
System.out.println("a : " + a + ", b : " + b + ", c : " + c);
// All elements expire and exit the loop
if (StringUtils.isEmpty(a) && StringUtils.isEmpty(b) && StringUtils.isEmpty(c)) {
break;
}
TimeUnit.MILLISECONDS.sleep(1000); } cache.shutdown(); }}Copy the code
The result is as follows:
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3
a : 1, b : 2, c : 3Key: c expires and removes a:1, b : 2, c : nullKey: b expires and removes A:1, b : null, c : nullKey: a expires and removes a:null, b : null, c : null
cache stop
Copy the code
As can be seen from the execution result, element C expires and is cleared from the cache after waiting for 3 seconds, element B expires and is cleared from the cache after waiting for 4 seconds, and element A expires and is cleared from the cache after waiting for 5 seconds.
Realize the principle of
variable
Reentrant lock
private final transient ReentrantLock lock = new ReentrantLock();
Copy the code
Used to ensure thread-safety of queue operations
Priority queue
private final PriorityQueue<E> q = new PriorityQueue<E>();
Copy the code
Storage medium, used to ensure that the execution with low latency takes precedence
leader
The leader points to the first thread that gets an element from the queue and blocks the wait. Its role is to reduce unnecessary waiting time for other threads. (I have never figured out how to reduce the wait time of other threads.)
condition
private final Condition available = lock.newCondition();
Copy the code
Condition object that is notified when a new element arrives, or when a new thread may need to become the leader
The following will mainly analyze the queue entry and queue exit actions:
Team – offer
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
/ / team
q.offer(e);
if (q.peek() == e) {
// If the queued element is at the head of the queue, the current element has the smallest delay
// Empty the leader
leader = null;
// Wake up the thread blocking the waiting queue
available.signal();
}
return true;
} finally{ lock.unlock(); }}Copy the code
Out of the team – poll
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
// Wait for add to wake up
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// Return to the queue head node
return q.poll();
first = null; // don't retain ref while waiting
if(leader ! =null)
// If the leader is not empty
// Another thread has called the take operation
// the current calling thread follower hangs and waits
available.await();
else {
// If leader is empty
// Point the leader to the current thread
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// The current calling thread is pending within the specified delay
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null&& q.peek() ! =null)
// Wake up the follower after the leader finishes processingavailable.signal(); lock.unlock(); }}Copy the code
Leader – followers mode
This figure is quoted from CSDN “Introduction to Leader/Follower Multi-threaded Network Model”.
summary
From the DelayQueue implementation, we can see why PriorityQueue has a small top heap.