This article focuses on the principles of Future and Callable, that is, how to obtain the results of thread execution outside the thread and how it works.
1 sample
1.1 the sample a
The following example code executes a Callable through the thread pool and retrieves the return result through the Future.
public static void main(String[] args) throws Exception {
Callable<Integer> callable = () -> {
Thread.sleep(1000);
Random random = new Random();
return random.nextInt(100);
};
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(callable);
System.out.println(DateUtil.getCurrentTime() + " ready to do task");
Integer result = future.get();
System.out.println(DateUtil.getCurrentTime() + " get task result! result=" + result);
}
Copy the code
The result of this method is as follows: the main thread waits for the task to execute, and after 1s, the thread finishes executing and returns the result, the main thread gets the result and prints it.
14:45:57:090 ready to do task
14:45:58:123 get task result! result=46
Copy the code
1.2 example 2
In the following example, we create and execute our own thread:
public static void main(String[] args) throws Exception {
Callable<Integer> callable = () -> {
Thread.sleep(1000);
Random random = new Random();
return random.nextInt(100);
};
FutureTask<Integer> task = new FutureTask<>(callable);
Thread thread = new Thread(task);
System.out.println(DateUtil.getCurrentTime() + " ready to do task");
thread.start();
Integer result = task.get();
System.out.println(DateUtil.getCurrentTime() + " get task result! result=" + result);
}
Copy the code
The following output is displayed
15:51:47:615 ready to do task
15:52:13:885 get task result! result=31
Copy the code
2 Principle Analysis
The above two examples are able to obtain the results of the thread’s execution, and the principle is the same. Both are supported by the Callable, Future, Runnable, FutureTask classes, and we’ll look at how this is implemented.
2.1 Principle Overview
One thread (such as threadA) gets the result of another thread (such as threadB). This function is implemented based on two points: converting the execution of Runnable#run into a call to the Callable#call method and storing the return result; Threads waiting for results are managed through wait queues (similar to AQS). FutrueTask performs a converter between Runnable and Callable, converting a thread’s Runnable#run to a call to Callable# Call. Because the run method is a method that executes a task in a thread, and call itself returns a result, FutureTask simply executes the Call method when it runs the Run method and stores the result of the execution in a place where other threads can retrieve the results of other threads’ execution through the Future. In simple terms, as shown below:
- Create a Callable object and add the tasks to be performed in its call() method.
- Create FutureTask (taskB) with Callable.
- Create a thread (threadB) from task and execute the thread start() method.
- The operating system schedules and executes threadB’s run() method, and since FutureTask implements the Runnable interface, it executes the Run () method in FutureTask (taskB).
- ThreadA uses tasKB.get () to get the result of threadB’s execution. If threadB is not finished, threadA is suspended.
- The main logic of FutureTask’s run() method is to execute the call() method of a Callable; The outcome of the call method is then set into the Outcome field of FutrueTask; It then wakes up the thread (threadA) that is waiting for the result of this thread running.
2.2 Callable
The Callable interface is very simple and just declares a call method.
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }Copy the code
2.3 the Future
A Future can represent the result of an asynchronous computation, which can be obtained via the GET method: if the computation has not completed, the current thread will be suspended; If the calculation is complete, the current thread will wake up and get the result. Here is an example code that uses Futrue provided by the Jdk, which is similar to “Example 1” above, so I won’t cover it more.
interface ArchiveSearcher { String search(String target); } ExecutorService executor = Executors.newSingleThreadExecutor(); ArchiveSearcher searcher = (target) -> { return "query=" + target + " content=hello world"; }; void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(() -> { return searcher.search(target); }); try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; }}Copy the code
2.4 FutureTask
2.4.1 Inheritance Structure
FutureTask is one of the most commonly used Future subclasses, and its inheritance results are shown below:As you can see from the inheritance relationship, FutureTask implements Runnable, so you can create a thread with FutureTask to run some specified tasks. FutrueTask implements the Future, so it can return the results of asynchronous computations.
2.4.2 encapsulation of the Runnable#run method
FutureTask implements the Runnable interface and implements the Run method, which is the most important reason for its ability to return thread results. Because the run method calls the Callable# Call method and saves the result, subsequent threads can retrieve the saved result as long as they have access to FutureTask. We’ll discuss several of these methods in detail next.
2.4.2.1 run () method
The main logic of this method is:
- Check the status and return if the thread has been started.
- Execute the call() method of Callable.
- The set method updates the execution result to the Outcome field of FutrueTask and wakes up the thread waiting for the result.
The source code is as follows:
public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
2.4.2.2 set () method
The main logic of the set() method is:
- Update the status of the thread to complete
- Update the return result of the call method to the outcome field of FutureTask;
- Wakes up the thread waiting for the result of this thread running.
The source code is as follows:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}Copy the code
2.4.3 Manage the threads waiting for results through the wait queue
2.4.3.1 Managing suspended threads
Let’s say we have three threads waiting for results, and they waiters waiters to a chained list by calling get() in the order of thread1, thread2, thread3 (we assume threads are suspended in that order).
When each thread gets the result through the get() method, they need to enter a wait state, that is, be suspended, because the task has not been completed. Each thread creates a WaitNode node that hangs on the Waiters property before being suspended. When the thread is finished, the list is used to find the suspended thread, which can then be woken up, and the result of the thread’s execution is returned
2.4.3.2 the get () method
A thread can use the FutureTask#get method to retrieve the result of another thread’s execution, which returns the same value saved in the run method above. The state is not COMPLETING the task, and the thread is suspended by awaitDone.
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
Copy the code
2.4.3.3 awaitDone () method
This method is used to suspend threads that need to wait. The logic for suspending can be found in the “Managing suspended threads” section above and the source code below. Note that if the thread is interrupted or the wait time exceeds the time limit, the wait queue will be cleared and the waiting thread will be woken up.
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q ! = null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (! queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}Copy the code
2.4.3.4 finishCompletion () method
In the set() method, we see that we first save the result and then clean up the threads waiting to hang through the finishCompletion method: clean up the list and wake up the thread.
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) ! = null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t ! = null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }Copy the code