The use of DelayQueue in Java
Introduction to the
DelayQueue is a type of BlockingQueue, so it is thread safe. DelayQueue is a type of BlockingQueue, so it is thread safe. DelayQueue is a type of BlockingQueue, so it is thread safe. Only elements with delay times less than 0 can be fetched.
DelayQueue
Let’s look at the definition of DelayQueue:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
Copy the code
By definition, the objects deposited in DelayQueue must all be subclasses of Delayed.
Delayed inherits from Comparable and needs to implement a getDelay method.
Why is it designed this way?
Since the underlying store of DelayQueue is a PriorityQueue, as we explained in previous articles, a PriorityQueue is a sortable Queue whose elements must implement the Comparable method. The getDelay method is used to determine whether the sorted elements can be retrieved from the Queue.
The application of DelayQueue
DelayQueue is typically used in the producer-consumer pattern, but let’s take a concrete example.
To use DelayQueue first, we must define an Delayed object:
@Data
public class DelayedUser implements Delayed {
private String name;
private long avaibleTime;
public DelayedUser(String name, long delayTime){
this.name=name;
//avaibleTime = current time + delayTime
this.avaibleTime=delayTime + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
// Determine whether avaibleTime is longer than the current system time and convert the result to MILLISECONDS
long diffTime= avaibleTime- System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//compareTo is used to sort DelayedUser
return (int) (this.avaibleTime - ((DelayedUser) o).getAvaibleTime()); }}Copy the code
In the above object, we need to implement the getDelay and compareTo methods.
Next we create a producer:
@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private Integer messageCount;
private long delayedTime;
@Override
public void run(a) {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser delayedUser = new DelayedUser(
new Random().nextInt(1000) +"", delayedTime);
log.info("put delayedUser {}",delayedUser);
delayQueue.put(delayedUser);
Thread.sleep(500);
} catch(InterruptedException e) { log.error(e.getMessage(),e); }}}}Copy the code
In the producer, we create a new DelayedUser object every 0.5 seconds and merge it into the Queue.
Create another consumer:
@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private int messageCount;
@Override
public void run(a) {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser element = delayQueue.take();
log.info("take {}",element );
} catch(InterruptedException e) { log.error(e.getMessage(),e); }}}}Copy the code
In the consumer, we loop to get objects from the queue.
Finally, look at an example of a call:
@Test
public void useDelayedQueue(a) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
DelayQueue<DelayedUser> queue = new DelayQueue<>();
int messageCount = 2;
long delayTime = 500;
DelayedQueueConsumer consumer = new DelayedQueueConsumer(
queue, messageCount);
DelayedQueueProducer producer = new DelayedQueueProducer(
queue, messageCount, delayTime);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
}
Copy the code
In the above test example, we defined a thread pool of two threads, the producer generates two messages, and the delayTime is set to 0.5 seconds, which means that after 0.5 seconds, the inserted object can be retrieved.
The thread pool will be closed after 5 seconds.
Run to see the result:
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)
Copy the code
We see that the put and take of messages alternate, as we would expect.
If we change delayTime to 50000, the element inserted before the thread pool is closed will not expire, meaning that the consumer will not get the result.
conclusion
DelayQueue is a BlockingQueue with strange features that can be used when needed.
Examples of this article github.com/ddean2009/l…
Welcome to pay attention to my public number: procedures those things, more wonderful waiting for you! For more, visit www.flydean.com