This article has participated in the third “topic writing” track of the Denver Creators Training Camp. For details, check out: Digg Project | Creators Training Camp third is ongoing, “write” to make a personal impact.
This is the first time to participate in this writing activity, please feel free to watch
DelayQueue
Scheduled tasks can be executed through delay queues.
When the scheduled task is put into the queue, the queue will automatically sort. When the time is 0(when the scheduled time is reached), the task will be put to the head of the queue. Then we can take out the scheduled task and perform operations.
DelayQueue is an unbounded blocking queue of Delayed elements, elements can only be extracted from the queue when the delay expires, and the head of the queue is the Delayed element that will be preserved the longest after the delay expires. If the delay period is not full, there are no elements in the queue header, and poll returns NULL.
Expiration occurs when the element’s getDelay method returns a value less than or equal to zero.
Implementing the Delayed interface requires implementing two methods
The getDelay() method is the Delayed method
This method returns a delay period, and when the delay expires the element is put to the head of the queue, and you can pull it out and so on.
CompareTo () is the Comparable method
With this method, the elements in the queue are sorted, usually using getDelay as the sorting criterion
methods
-
Take (): blocks the method until the element whose delay expires is retrieved and removed
-
Put (): Inserts the specified element into the queue
- And offer the same
-
Offer (E): inserts the specified element into the queue: locks
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); }}Copy the code
-
Poll (): Gets and removes the element whose header delay has expired, or returns NULL if there are no elements whose delay has expired
-
Peel (): Retrieves but does not remove the queue header, or returns NULL if the queue is empty
-
Size (): Returns the number of elements
-
Clear (): automatically removes all elements from the queue
implementation
1. The filled element needs to implement the Delayed interface, so the entity class can either implement this interface or encapsulate the entity class with a layer for further implementation without damaging the functionality of the entity class.
public class TaskDelayed implements Delayed {
private ServicePackTask packTask;
// To get the delay time, use the send time - current time
@Override
public long getDelay(TimeUnit unit) {
// return unit.convert((packTask.getPushTime().getTime() - System.currentTimeMillis()) / 1000 , TimeUnit.SECONDS);
return (this.packTask.getPushTime().getTime() - System.currentTimeMillis()) /1000;
}
// Compare the queue within the delay queue, and put the short delay in the head
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.SECONDS) - o.getDelay(TimeUnit.SECONDS)); }}Copy the code
2. Implementation of producer and consumer approaches
public void provider(ServicePackTask servicePackTask) {
TaskDelayed taskDelayed = new TaskDelayed(servicePackTask);
queue.offer(taskDelayed);
}
Copy the code
Producers are simpler; they put the element into the queue as soon as the method is called
The consumer is essentially reading in an infinite loop. First determine if there is any data in the queue, and if not, sleep for some time to recycle (reduce CPU stress).
If the queue is not empty, the element is fetched and manipulated using the take method (which automatically blocks the thread until the element’s delay expires).
public void consumer(a) {
while (true) {
if (0 == queue.size()) {
System.out.println("The end");
break;
}
TaskDelayed take = null;
try {
take = queue.take();
System.out.println(take.getPackTask().getPushTime());
} catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code
3. Encapsulation method
Use two threads to separate production and consumption.
Why put the two methods together?
Because consumption is automatically calculated by the queue, we can’t operate on the elements in the queue without triggering, so we need to put consumption in a place where it can go, which is the production place.
But you need to use multiple threads to separate the two methods, otherwise the consumer will not be able to block the produced method.
public void start(ServicePackTask servicePackTask) {
new Thread(new Runnable() {
@Override
public void run(a) {
provider(servicePackTask);
}
}).start();
new Thread(new Runnable() {
@Override
public void run(a) {
consumer();
}
}).start();
}
Copy the code
Individual optimization
If each call to production was created and a consuming thread was created to consume, this would greatly increase the burden on the server, so we could do this by putting the consuming method in the code block of the class. When an instance of the class was created, the consuming logic in the code block would be triggered to begin consuming.
{ // Queue consumption: send to user center and insert into database
new Thread(new Runnable() {
@Override
public void run(a) {
ServicePackOrderPlanDO servicePackOrderPlanDO;
OrderPlanDelayItem take = null;
while (true) {
if (0 == queue.size()) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}else {
try {
take = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (null! = take) { servicePackOrderPlanDO = take.getServicePackOrderPlanDO();// Insert the message table
ServicePackSendMessageDO servicePackSendMessageDO = transToMessage(servicePackOrderPlanDO);
// Message subscription
subscribeMessage(servicePackSendMessageDO,servicePackOrderPlanDO);
// Change the status of the PLAN database
servicePackOrderPlanDO.setPlanState(SendStateEnum.SEND.getCode());
servicePackOrderPlanDO.setUpdateTime(new Date());
servicePackOrderPlanDAO.updateServicepackOrderPlan(servicePackOrderPlanDO);
message_logger.info("Service pack task plan, modified plan status succeeded, Plan number:" + servicePackOrderPlanDO.getPlanNo());
}
}
}
}
}).start();
}
Copy the code