The use of DelayQueue in Java

Introduction to the

DelayQueue is a type of BlockingQueue, so it is thread safe. DelayQueue is a type of BlockingQueue, so it is thread safe. DelayQueue is a type of BlockingQueue, so it is thread safe. Only elements with delay times less than 0 can be fetched.

DelayQueue

Let’s look at the definition of DelayQueue:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>
Copy the code

By definition, the objects deposited in DelayQueue must all be subclasses of Delayed.

Delayed inherits from Comparable and needs to implement a getDelay method.

Why is it designed this way?

Since the underlying store of DelayQueue is a PriorityQueue, as we explained in previous articles, a PriorityQueue is a sortable Queue whose elements must implement the Comparable method. The getDelay method is used to determine whether the sorted elements can be retrieved from the Queue.

The application of DelayQueue

DelayQueue is typically used in the producer-consumer pattern, but let’s take a concrete example.

To use DelayQueue first, we must define an Delayed object:

@Data
public class DelayedUser implements Delayed {
    private String name;
    private long avaibleTime;

    public DelayedUser(String name, long delayTime){
        this.name=name;
        //avaibleTime = current time + delayTime
        this.avaibleTime=delayTime + System.currentTimeMillis();

    }

    @Override
    public long getDelay(TimeUnit unit) {
        // Determine whether avaibleTime is longer than the current system time and convert the result to MILLISECONDS
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo is used to sort DelayedUser
        return (int) (this.avaibleTime - ((DelayedUser) o).getAvaibleTime()); }}Copy the code

In the above object, we need to implement the getDelay and compareTo methods.

Next we create a producer:

@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;

    private Integer messageCount;

    private long delayedTime;

    @Override
    public void run(a) {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser delayedUser = new DelayedUser(
                        new Random().nextInt(1000) +"", delayedTime);
                log.info("put delayedUser {}",delayedUser);
                delayQueue.put(delayedUser);
                Thread.sleep(500);
            } catch(InterruptedException e) { log.error(e.getMessage(),e); }}}}Copy the code

In the producer, we create a new DelayedUser object every 0.5 seconds and merge it into the Queue.

Create another consumer:

@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {

    private DelayQueue<DelayedUser> delayQueue;

    private int messageCount;

    @Override
    public void run(a) {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser element = delayQueue.take();
                log.info("take {}",element );
            } catch(InterruptedException e) { log.error(e.getMessage(),e); }}}}Copy the code

In the consumer, we loop to get objects from the queue.

Finally, look at an example of a call:

    @Test
    public void useDelayedQueue(a) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        DelayQueue<DelayedUser> queue = new DelayQueue<>();
        int messageCount = 2;
        long delayTime = 500;
        DelayedQueueConsumer consumer = new DelayedQueueConsumer(
                queue, messageCount);
        DelayedQueueProducer producer = new DelayedQueueProducer(
                queue, messageCount, delayTime);

        // when
        executor.submit(producer);
        executor.submit(consumer);

        // then
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();

    }
Copy the code

In the above test example, we defined a thread pool of two threads, the producer generates two messages, and the delayTime is set to 0.5 seconds, which means that after 0.5 seconds, the inserted object can be retrieved.

The thread pool will be closed after 5 seconds.

Run to see the result:

[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)
Copy the code

We see that the put and take of messages alternate, as we would expect.

If we change delayTime to 50000, the element inserted before the thread pool is closed will not expire, meaning that the consumer will not get the result.

conclusion

DelayQueue is a BlockingQueue with strange features that can be used when needed.

Examples of this article github.com/ddean2009/l…

Welcome to pay attention to my public number: procedures those things, more wonderful waiting for you! For more, visit www.flydean.com