The problem
(1) DelayQueue is a blocking queue?
(2) How to implement DelayQueue?
(3) What scenarios is DelayQueue mainly used in?
Introduction to the
DelayQueue is a Java delay blocking queue for concurrent packet sending. It is often used to implement scheduled tasks.
Inheritance system
As you can see from inheritance, DelayQueue implements BlockingQueue, so it is a BlockingQueue.
In addition, the DelayQueue combines an interface called Delayed, and all elements stored in the DelayQueue must implement the Delayed interface.
So what is Delayed?
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Copy the code
Delayed is an inherited interface from Comparable and defines a getDelay() method to indicate how much time is due, which should return a value less than or equal to zero.
Source code analysis
The main properties
// The lock used to control concurrency
private final transient ReentrantLock lock = new ReentrantLock();
// Priority queue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// Is used to indicate whether there are threads currently queued (only for fetching elements)
private Thread leader = null;
// Condition, used to indicate whether there are now desirable elements
private final Condition available = lock.newCondition();
Copy the code
As can be seen from the attribute, delay queue is mainly implemented by priority queue, supplemented by reentrant lock and condition to control concurrency security.
Since the priority queue is unbounded, only one condition is required here.
Remember priority queues? Click on the link to get to the source code of PriorityQueue
Main construction methods
public DelayQueue(a) {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
Copy the code
The constructor is relatively simple, a default constructor that initializes the addition of all elements in collection C.
The team
Because DelayQueue is a blocking queue and the priority queue is unbounded, it does not block and does not time out, so its four queue methods are the same.
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally{ lock.unlock(); }}Copy the code
The way to join the team is relatively simple:
(1) lock;
(2) Add the element to the priority queue;
(3) If the added element is the top of the heap, set leader to null and wake up the thread waiting on condition available;
(4) Unlock;
Out of the team
Since a DelayQueue is a blocking queue, it can be queued in four different ways: throwing exceptions, blocking, non-blocking, and timeout.
We mainly analyze two methods here, poll() and take().
public E poll(a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally{ lock.unlock(); }}Copy the code
The poll() method is simple:
(1) lock;
(2) Check the first element and return null if it is empty or has not expired;
(3) If the first element expires, poll() of the priority queue is called to pop the first element;
(4) Unlock.
public E take(a) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// The top element of the heap
E first = q.peek();
// If the heap top element is empty, there is no element in the queue
if (first == null)
available.await();
else {
// The expiration time of the heaptop element
long delay = first.getDelay(NANOSECONDS);
// If the value is less than 0, the poll() method is called to pop the heap top element
if (delay <= 0)
return q.poll();
// If delay is greater than 0, there is a block
// Set first to null to facilitate GC, as other elements may pop the element
// References are still held here and will not be cleaned
first = null; // don't retain ref while waiting
// If there is another thread waiting, enter the wait directly
if(leader ! =null)
available.await();
else {
// If the leader is null, assign the current thread to it
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// Automatically wakes up after the delay
// After waking up, empty the leader and re-enter the loop to determine whether the heap top element is expired
// It is not possible to get the element even after waking up
// Because it is possible that another line acquired the lock first and ejected the heap top element
// The conditional lock is awakened in two steps, and is first queued from the Condition
// The AQS queue will wake up when another thread calls locksupport.unpark (t)
// We will talk about AQS later ^^
available.awaitNanos(delay);
} finally {
// If the leader is still the current thread, it is left empty to give other threads a chance to fetch elements
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// If the leader is empty and there are elements on top of the heap, the next waiting thread is woken up
if (leader == null&& q.peek() ! =null)
// Signal () just puts the waiting thread into the AQS queue
available.signal();
// Unlock, this is the real wake uplock.unlock(); }}Copy the code
The take() method is slightly more complicated:
(1) lock;
(2) Judge whether the top element of the heap is empty, if so, block and wait directly;
(3) Determine whether the top element of the heap expires, and call the poll() popup element of the priority queue directly after expiration;
(4) If there is no expiration date, then judge whether there are other threads waiting in front, if there is, wait directly;
(5) If there is no other thread waiting, it will wake up as the first thread waiting for the delay time, and then try to acquire the element;
(6) Wake up the next waiting thread after obtaining the element;
(7) Unlock;
Method of use
After all that, do you still not know how to use it? How can that work? Take a look at the following example:
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<Message> queue = new DelayQueue<>();
long now = System.currentTimeMillis();
// Start a thread to fetch elements from the queue
new Thread(()->{
while (true) {
try {
// will print 1000,2000,5000,7000,8000 in sequence
System.out.println(queue.take().deadline - now);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// Add 5 elements to the queue
queue.add(new Message(now + 5000));
queue.add(new Message(now + 8000));
queue.add(new Message(now + 2000));
queue.add(new Message(now + 1000));
queue.add(new Message(now + 7000)); }}class Message implements Delayed {
long deadline;
public Message(long deadline) {
this.deadline = deadline;
}
@Override
public long getDelay(TimeUnit unit) {
return deadline - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString(a) {
returnString.valueOf(deadline); }}Copy the code
Isn’t it simple? The earlier the element expires, the sooner it’s out.
conclusion
(1) DelayQueue is a blocking queue;
(2) DelayQueue internal storage structure uses priority queue;
(3) DelayQueue uses reentrant locks and conditions to control concurrency security;
(4) DelayQueue is usually used for scheduled tasks;
eggs
A thread pool in Java implements scheduled tasks directly using DelayQueue?
Of course not, ScheduledThreadPoolExecutor is used in its definition of the inner class DelayedWorkQueue, actually inside the implementation of the logic basic are all the same. Instead of using PriorityQueue, DelayedWorkQueue implements a PriorityQueue again using arrays, essentially the same.
Welcome to pay attention to my public number “Tong Elder brother read source code”, view more source code series articles, with Tong elder brother tour the ocean of source code.