Structure of 1.

DelayQueue inheritance, core member variables, and main constructors:

// The elements in the queue must implement the Delayed interface, with which each entity object will have an expiration date
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
	implements BlockingQueue<E>{
	
	// Combine PriorityQueue to perform queue operations
    private final PriorityQueue<E> q = new PriorityQueue<E>();
	// reentrantLock ensures thread safety
	private final transient ReentrantLock lock = new ReentrantLock();
	// The team is in Condition. It's easy to understand: when a thread comes to fetch an element, but the queue is empty or no element has expired, so it waits in a conditional queue
	private final Condition available = lock.newCondition();
    // The thread with the shortest waiting time left in the conditional queue, usually the thread that came first
    // Note: Although leader is introduced, it is not fair (1. The new thread returns the element when it can retrieve it.
	private Thread leader = null;
	
	/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the constructor -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
	public DelayQueue(a) {}
		
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c); }}Copy the code
  • PriorityQueue PriorityQueue PriorityQueue PriorityQueue PriorityQueue PriorityQueue PriorityQueue For details, please refer to [Java container source code] PriorityQueue source code analysis.
  • Reuse has always been a common and important topic, such as DelayQueue’s ability to reuse PriorityQueue, LinkedHashMap’s ability to reuse HashMap, and Set’s ability to reuse Map. To summarize, what needs to be done if you want to reuse:
    • Need to meet the reusable functions as far as possible abstract, and open the extensible place, for example, HashMap in the method of operating array, to LinkedHashMap open a lot of after methods, easy LinkedHashMap sorting, delete, etc.;
    • Reuse is done by combination or inheritance, such as inheritance for LinkedHashMap and combination for Set and DelayQueue, which means to bring in reusable classes.
  • <E extends Delayed>: All elements in the queue should implement the Delayed interface. As you can see from the source code below, Delayed also inherits the Comparable interface, so the elements in the queue should implement the following two methods:
    • GetDelay: obtains the expiration time. It is important to note that the expiration time of each element is determined before it is queued, not passed in as a parameter when it is queued.
    • CompareTo: Sorts all elements with the expiration time of getDelay
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
Copy the code

2. Method parsing & API

2.1 the team

put()

public void put(E e) {
    	/ / call the offer
        offer(e);
}
Copy the code

offer()

The logic of offer is simple

  1. Call the Offer method of PriorityQueue to add the new element. Because PriorityQueue is the smallest heap at the bottom, new elements float up

  2. If the new element happens to be the head of the queue (heap top), then immediately wake up a waiting thread and remove the current leader, giving the new thread a chance to become the leader.

    The reason for this is that the leader must be the thread with the shortest remaining wait time in the conditional queue (wait time = queue head expiration time), but now a new queue head with a shorter expiration time has arrived, so a shorter thread should be the leader. If you don’t understand this sentence, compare it with the following take method.

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); / / lock
    try {
        // Call the Offer method of PriorityQueue directly to use its sorting and expansion capabilities
        q.offer(e);
        // If the element you just put is in the queue head
        // Immediately wake up a thread waiting on the conditional queue and set the current leader to null, giving the newly woken thread a chance to become the leader
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock(); / / releases the lock}}Copy the code

2.2 the team

take()

The process of the take method is as follows:

  • The thread that holds the lock can enter spin

  • Take out the first team, divided into the following two situations:

    • Case 1: First = NULL, queue empty, sleep current thread

    • Situation 2: First! = null, that is, there is an element in the queue, depending on whether the element is expired

      • Case 2.1: Expiration time delay<0, that is, the queue head has expired, can be taken out, return

      • Case 2.2: The queue head is not expired, so the current thread should be put into conditional queue to wait for hibernation, but at this time, whether the current thread can be the leader should be considered

        • Case 2.2.1: If the current thread is the first thread to come, it is put into the leader, and then sleep periodically. Note: this does not guarantee fairness and does not guarantee that the queue leader will be automatically woken up

        • Case 2.2.2: The current thread sleeps when the leader is already available

  • Release the lock

public E take(a) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // Add lock, interrupt
        try {
            // Spin guarantees success
            for (;;) {
                // Take out the first team first
                E first = q.peek();
                // Case 1: first=null, the queue is empty, sleep the current thread
                if (first == null)
                    available.await();
                // Case 2: the queue is not empty
                else {
                    long delay = first.getDelay(NANOSECONDS); // Get the expiration delay of the first element
                    // Case 2.1: First has expired, then you can exit the queue
                    if (delay <= 0)
                        return q.poll();
                    first = null; // Set the first reference to NULL to facilitate gc
                    // Case 2.2: First does not expire
                    // Case 2.2.1: leader! =null, that is, there is already a thread waiting to fetch before the current thread, so directly sleep the current thread
                    if(leader ! =null)
                        available.await();
                    // Case 2.2.2: leader == null, i.e. the current thread is the first thread to be fetched and saved to the leader
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // Time the leader to sleep and release the lock
                            // It will wake up automatically and have a high probability of winning the team leader
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null&& q.peek() ! =null)
                available.signal();
            lock.unlock();  / / releases the lock}}Copy the code

poll()

In contrast to take, poll returns NULL if the queue is empty or if the head of the queue has not expired, rather than blocking there

public E poll(a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            / /!!!!!! If the queue is empty or the first queue has not expired, null is returned
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally{ lock.unlock(); }}Copy the code

In addition,If you don’t want to keep blocking indefinitely, poll can be called to set the blocking time

2.3 Get team leader: Peek

public E peek(a) {
    	// Get the lock to prevent it from being modified while viewing
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // Call peek for priortyQueue
            return q.peek();
        } finally{ lock.unlock(); }}Copy the code

3. Example

public class DelayQueueDemo {
    
  @Data
  // Queue element that implements the Delayed interface
  static class DelayedDTO implements Delayed {
    Long s;
    Long beginTime;
    public DelayedDTO(Long s,Long beginTime) {
      this.s = s;
      this.beginTime =beginTime;
    }

    // Override getDelay to get the expiration time
    public long getDelay(TimeUnit unit) {
      return unit.convert(s - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    // compareTo
    public int compareTo(Delayed o) {
      return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); }}/ / producer
  static class Product implements Runnable {
    DelayQueue is the implementation class of BlockQueue, so it is more flexible to use interfaces here
    private final BlockingQueue queue;
    public Product(BlockingQueue queue) {
      this.queue = queue;
    }
    
    @Override
    public void run(a) {
      try {
        log.info("begin put");
        long beginTime = System.currentTimeMillis();
        // Put into queue, delay execution 2 seconds
        queue.put(new DelayedDTO(System.currentTimeMillis() + 2000L,beginTime));
        // The execution is delayed for 5 seconds
        queue.put(new DelayedDTO(System.currentTimeMillis() + 5000L,beginTime));
        // Delay execution for 10 seconds
        queue.put(new DelayedDTO(System.currentTimeMillis() + 1000L * 10,beginTime));
        log.info("end put");
      } catch (InterruptedException e) {
        log.error(""+ e); }}}/ / consumer
  static class Consumer implements Runnable {
    private final BlockingQueue queue;
    public Consumer(BlockingQueue queue) {
      this.queue = queue;
    }

    @Override
    public void run(a) {
      try {
        log.info("Consumer begin");
        // fetch from the queue
        ((DelayedDTO) queue.take()).run();
        ((DelayedDTO) queue.take()).run();
        ((DelayedDTO) queue.take()).run();
        log.info("Consumer end");
      } catch (InterruptedException e) {
        log.error(""+ e); }}}// Mian
  public static void main(String[] args) throws InterruptedException {
    BlockingQueue q = new DelayQueue();
    // Pass delayQueue to producers and consumers
    DelayQueueDemo.Product p = new DelayQueueDemo.Product(q);
    DelayQueueDemo.Consumer c = new DelayQueueDemo.Consumer(q);
    new Thread(c).start();
    newThread(p).start(); }}Copy the code

The result is as follows:

06:57:50.54 [thread-1] Consumer begin 06:57:50.544 [thread-1] begin put 06:57:50.551 [thread-1] end of Consumer begin put 06:57:52.554 [thread-0] 06:57:55.555 [thread-0] 06:58:00.555 [thread-0] 06:58:00.555 [thread-0] 06:58:00.556 [thread-0] Consumer endCopy the code