link
Java thread pool ScheduleExecutorService Java thread pool ScheduleExecutorService
ScheduleExecutorService is a Java thread pool that you can refer to recently. This article wraps up your learning and summary of ScheduleExecutorService. A more in-depth study and summary of Java thread pool technology will take place at a later time.
Submit a dead-loop task to a ScheduleExecutorService
This article continues with the Java scheduling thread pool ScheduleExecutorService by asking what happens if you commit an infinite loop to a scheduling thread pool. For completeness, this article will mention some of the things that have been covered in the articles listed above.
For example, we run the following code:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private static Runnable loopRunner = () -> {
for(;;) {}}; scheduledExecutorService .scheduleAtFixedRate(loopRunner,0.100, TimeUnit.MILLISECONDS);
Copy the code
Looprunners only have an infinite loop that does nothing. This is extreme, of course, but is more common in for(;). Do some driver type work, such as Netty’s EventLoop, which makes more sense, but this article is just to learn what happens when you submit an infinite loop to a scheduled thread pool.
ScheduleAtFixedRate ();
1. Wrap loopRunner as a ScheduledFutureTask, which is essential for scheduling thread pools
2. Wrap it again as a RunnableScheduledFuture object
3. The delayedExecute method is run to ensure that the task is properly processed. If the thread pool is closed, reject the submission of the task, otherwise add the task to a workQueue. The maximum capacity is integer.max_value
4. Run the ensurePrestart method to ensure that the thread pool is working. If the number of threads in the pool has not reached the specified corePoolSize, add a new Worker and let the Worker delay the queue to get the task to execute
Add a Worker and let it execute the task we submitted. Here is a snippet of addWorker’s method content:
/** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** **
private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if(t ! =null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // Add a new worker
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; }}finally {
mainLock.unlock();
}
if (workerAdded) { // If adding the new Worker succeeds, then it will start to execute the task we submitted
t.start();
workerStarted = true; }}}return workerStarted;
}
Copy the code
6. The most important sentence in step 5 is t.start(). What happens when this sentence is executed? Let’s first look at what this T is:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Copy the code
This is the Worker itself, and Worker implements Runnable. In other words, t.start() will execute the run method of Worker itself
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
Copy the code
7. We already know that Worker’s run method is now executed. Here is the content of the run method:
public void run(a) {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while(task ! =null|| (task = getTask()) ! =null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally{ afterExecute(task, thrown); }}finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally{ processWorkerExit(w, completedAbruptly); }}Copy the code
If the task is empty, the queue will be delayed to obtain the task. If the task is not obtained, the thread can rest. If the task is obtained, the following content will be continued. The main sentence is task.run(), what happens to this sentence?
To know what happens after task.run(), we need to know what task is, as described in step 2, which is our task wrapped as a RunnableScheduledFuture<Void> object. Now look at what happens to run in the RunnableScheduledFuture method. Here are the details of its run method:
public void run(a) {
boolean periodic = isPeriodic();
if(! canRunInCurrentRunState(periodic)) cancel(false);
else if(! periodic) ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); }}Copy the code
If this is not a periodic task, execute the super run method, otherwise execute the runAndReset method. Since this is a problem-oriented article, we won’t analyze the Super run and runAndReset methods here, just know that this is the task we actually submitted. In other words, at this point, our task starts to run, our loopRunner starts to loop indefinitely, and the code below will never be executed. So, at this point, the problem is solved. If you submit an infinite loop to a scheduled thread pool, the task will hold on to a thread, and if unfortunately only one thread is allowed in the pool, the other submitted tasks will not be scheduled to execute.
In order to work through the process, we assume that the submitted task is not an infinite loop, that the submitted task will always be executed, and that the thread will always be released. SetNextRunTime is executed.
private void setNextRunTime(a) {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
Copy the code
P > 0 represents scheduleAtFixedRate, while P < 0 represents scheduleWithFixedDelay. The difference between the two is that the former always sets the time of the next schedule according to the set track. The latter always set the next execution time according to the cycle after the task is completed. Let’s just analyze the former. For the task submitted for the first time, time is equal to the current time + the time of the first delayed execution. For the case where delay is 0, time of the first submitted task is the current time, and + p represents the time that should be scheduled next time.
10. We found that it was particularly critical to set a time for the next task after each task was completed. The reExecutePeriodic method is periodic. Here is what the reExecutePeriodic method looks like:
void reExecutePeriodic(RunnableScheduledFuture
task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if(! canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
elseensurePrestart(); }}Copy the code
This method just re-submits the task to the delay queue, and a complete process is over after all. For the integrity of the content, let’s analyze the situation when a Worker obtains the task from the delay queue. Getting back to step 7, we have one method we didn’t mention, getTask():
private Runnable getTask(a) {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if(r ! =null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; }}}Copy the code
We see two main methods: poll/take, these two methods from delay queue for a task, below is the poll code, take blocks to get to the content, the poll is not blocked, don’t stick to take code:
publicRunnableScheduledFuture<? > poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for(;;) { RunnableScheduledFuture<? > first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if(nanos < delay || leader ! =null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null) available.signal(); lock.unlock(); }}Copy the code
Poll code is the most core content, task queue first, and then obtain the delay time, the time we finish a schedule set up after the next scheduled time, if the task running time is greater than we set the cycle, the delay time is negative, so will be implemented immediately, otherwise you will wait for the set time, Return to the Worker when the time is up.
Finally, paste the details of the getDelay method so that the content is complete, where the time is set:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
Copy the code