The foreword 0.
In Java, threads are created by inheriting the Thread class or implementing the Runnable interface. However, both methods have a defect that they cannot obtain the results of execution after completion. Therefore, Java 1.5 provides Callable and Future interfaces. They can be used to obtain the execution result of the task after the task is completed. FutureTask can often be seen in some open source frameworks. This article will briefly introduce how to use FutureTask, and then analyze the specific implementation principle from the perspective of source code.
1. FutureTask use
A simple demo is used here to illustrate the use of FutureTask and some of the main method interfaces.
FutureTask<Integer> FutureTask = new FutureTask<>(() -> {system.out.println (" calculating the result... ); Thread.sleep(3000); return 1; }); Thread = new Thread(futureTask); thread.start(); Thread1 = new Thread(() -> {try {futureTask.get(); futureTask.get(); } catch (Exception e) { e.printStackTrace(); }}); thread1.start(); Thread.sleep(1000); // 5. IsDone () if(! futureTask.isDone()) { System.out.println("Task is not done"); Println ("result is "+ futureTask.get())); system.out.println ("result is" + futureTask.get())); Thread.sleep(2000); if(futureTask.isDone()) { System.out.println("Task is done"); Println ("result is "+ futureTask.get()));} // 8.Copy the code
Common interface description
A brief description of the interface definition Boolean Cancel (Boolean mayInterruptInRunning) cancels a task and returns the cancellation result. Parameter indicates whether to interrupt the thread. Boolean isCancelled() determines whether the task isCancelled. Boolean isDone() determines whether the task is complete, including complete execution, abnormal execution, or canceled. V get() Gets the execution result of the task, which is blocked before the task is complete. V get(long timeout, TimeUnit Unit) Attempts to obtain the execution result within the specified time. If timeout occurs, a timeout exception is thrownCopy the code
2. FutureTask principle
2.1 Variable Definition
Callable Callable: The submitted task Object Outcome: The task execution result or the task is abnormal Volatile Thread Runner: The task execution Thread volatile WaitNode waiters: The waiting node is associated with the waiting thread long stateOffset: memory offset of the state field Long runnerOffset: memory offset of the Runner field Long waitersOffset: memory offset of the waiters fieldCopy the code
2.2 Internal Status Transfer
FutureTask needs to deal with the multi-clock states of the task execution, such as the normal completion of the task, the abnormal occurrence of the task, the cancellation of the task, the interruption of the task and other states. In the source code, FutureTask uses the state bit state to convert different states.
/** * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
Copy the code
Note that state is volatile, meaning that if any thread changes the variable, all other threads will know the latest value. In order to better analyze FutureTask implementation later, it is necessary to explain the various states.
- NEW: indicates a NEW task or task that has not been completed. This is the initial state.
- COMPLETING tasks well: those patients who have completed tasks or have had exceptions occurring during them, but have not been saved in the outcome field. The state will change from NEW to COMPLETING. But this state will be relatively short time, belongs to the intermediate state.
- NORMAL: The task has been executed and the outcome has been saved to the outcome field. The status changes from COMPLETING the task to NORMAL. This is a final state.
- EXCEPTIONAL: The state transitions from COMPLETING to EXCEPTIONAL after an exception occurs during task execution and the reason for the exception has been saved to the outcome field. This is a final state.
- CANCELLED: When the task has not been CANCELLED or has been CANCELLED but has not been completed, the user calls cancel(false) to cancel the task without interrupting the thread of execution. In this case, the status changes from NEW to
- CANCELLED. This is a final state.
- INTERRUPTING: The status changes from NEW to INTERRUPTING when the user calls the cancel(true) method to cancel a task before it has started or when it has been executed but has not completed. This is an intermediate state.
- INTERRUPTED: Calling interrupt() interrupts the state from INTERRUPTING to INTERRUPTED after the task executes the thread. This is a final state.
It is important to note that all states with a value greater than COMPLETING the task (COMPLETING normally, abnormal, or cancelled) have completed the task.
2.2 Main Process
As you can see from the demo above, the main thread has been blocking and waiting for the result after FutureTask is created. The task thread is responsible for executing the task, inserting the final result back into the output outcome field, and notifying the waiting thread that since multiple threads can block and wait, The internal implementation uses the wait list and the park/unpark operation of LockSupport to synchronize operations between different threads. The general process is shown below.
1. AwaitDone can be called in the main thread/other waiting thread. This logic spins and calls Park to block wait, and if the task has not completed, it is placed in the wait chain.
2. After executing the task, the executing thread traverses the waiting list and unpark each node.
3. The unpark synchronization signal is received in the awaitDone() logic to wake up the corresponding thread that has completed the task.
2.3 Core method analysis
2.3.1 public void the run ()
When a task is executed, it goes through several steps:
1. Check the task status state.
2. Execute the task logic by calling c.call().
3. If the business logic is normal, the set method will be called to assign the execution result to the outcome and update the state value;
4. If the business logic is abnormal, the setException method will be called to assign the exception object to the outcome and update the state value.
public void run() { if (state ! = NEW || ! UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c ! = null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }} protected void set(V V) {// State state from New->Completing if (UNSAFE.compareAndSwapInt(this, stateOffset, New, COMPLETING)) { outcome = v; PutOrderedInt (this, stateOffset, Normal); //state from Completing->Normal unsafe. putOrderedInt(this, stateOffset, Normal); // final state finishCompletion(); }} protected void setException(Throwable t) {// State from New->Completing if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; Undeclared -> EXCEPTIONAL. PutOrderedInt (this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}Copy the code
2.3.2 Get () and GET (Long timeout, TimeUnit Unit)
Tasks are executed by other threads, and the main thread blocks until the task thread wakes them up. Let’s see how this works by using get(Long timeout, TimeUnit Unit).
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } @param timed = true * @param timed = true * @param timed = true */ private int awaitDone(Boolean timed, Long nanos) throws InterruptedException {// Final long deadline = timed? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // spin for (;;) {if (thread.interrupted ()) {// If (thread.interrupted ()) {remove the waiting Thread and throw an exception removeWaiter(q); throw new InterruptedException(); } int s = state; // The task may have been completed or cancelled. (q! = null) q.thread = null; return s; } else if (s == COMPLETING) // It is possible that the task Thread is blocked, and the main Thread yields the CPU thread.yield (); Else if (q == null) // Wait for the thread node to be empty, initialize the new node and associate the current thread q = new WaitNode(); else if (! Queued) / / waiting thread into the queue, the success is queued = true queued = UNSAFE.com pareAndSwapObject (this, waitersOffset q.n ext = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); If (nanos <= 0L) {removeWaiter(q); return state; } locksupport. parkNanos(this, nanos); } else // timed=false when timed here, suspend the current thread locksupport.park (this); }}Copy the code
2.3.3 finishCompletion ()
When the task thread completes execution, it will wake up in the finishCompletion() method, where the WaitNode node stores the information of the waiting thread. The linked list is traversed, and the locksupport. unprk operation is performed on each waiting thread to wake up the waiting thread.
Private void finishCompletion() {private void finishCompletion() {for (WaitNode q; (q = waiters) ! = null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t ! = null) { q.thread = null; // Wake up the waiting thread locksupport. unpark(t); } WaitNode next = q.next; if (next == null) break; // unlink to help gc q.next = null; q = next; } break; }} // template methods can be overwritten done(); Callable = null; }Copy the code
2.3.4 Public Boolean Cancel (Boolean mayInterruptIfRunning)
1. If state is not NEW, the task is about to enter the final state. If false is returned, the operation fails. 2. If the state is NEW, the task may or may not be executed. 3. MayInterruptIfRunning Indicates whether the thread is interrupted. If so, try setting state to interrupt and INTERRUPTING the thread, and then setting state to final state INTERRUPTED. 4. If mayInterruptIfRunning=false, the thread is not interrupted and state is set to CANCELLED 5. Removes the waiting thread and wakes it up. 6. Return true
public boolean cancel(boolean mayInterruptIfRunning) { if (state ! = NEW) return false; if (mayInterruptIfRunning) { if (! UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t ! = null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (! UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }Copy the code
2.3.5 LockSupport
JDK provides a lot of – multithreading tool classes, LockSupport is a good implementation, through the functional interface can be easily implemented as required to schedule threads, such as suspend/resume threads.
public static void park(Object blocker); Public static void parkNanos(Object Blocker, long nanos); Public static void parkUntil(Object Blocker, long deadline); Public static void park(); Public static void parkNanos(long nanos); Public static void parkUntil(long deadline); public static void parkUntil(long deadline); Public static void unpark(Thread Thread); Public static Object getBlocker(Thread t); Thread t = new Thread(() -> { System.out.println("park before"); LockSupport.park(); if System.out.println("park after"); }); t.start(); Thread.sleep(3000L); LockSupport.unpark(t);Copy the code
Park /unpark implements similar functions to Wait /notify. Park/Unpark uses the Unsafe class, which requires reading the JDK source code for its implementation logic.
public static void park() { UNSAFE.park(false, 0L); } public static void unpark(Thread thread) { if (thread ! = null) UNSAFE.unpark(thread); } //c++ code implements class Parker: public OS ::PlatformParker {private: volatile int _counter; . public: void park(bool isAbsolute, jlong time); void unpark(); . } class PlatformParker : public CHeapObj<mtInternal> { protected: pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [1] ; . }Copy the code
3. Summary
Through in-depth analysis of FutureTask’s simple use and implementation principle, this paper has an understanding of FutureTask’s thread model and synchronization mode between threads, as well as LockSupport and Unsafe tool classes. The idea of asynchronous task processing has been widely used in multithreading and network programming, it is good to master the multithreading programming level.
The resources
Tech.meituan.com/2019/02/14/…
Juejin. Cn/post / 684490…
www.cnblogs.com/throwable/p…
Beautyboss.farbox.com/post/study/…
Blog.csdn.net/a7980718/ar…
www.jianshu.com/p/f1f2cd289…
www.jianshu.com/p/e3afe8ab8…