Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.
preface
There are four main implementations of Java threads:
- Thread class inheritance
- Implement the Runnable interface
- Implement the Callable interface to create Thread threads through the FutureTask wrapper
- Implement multithreading with returns using ExecutorService, Callable, and Future.
The first two methods do not return a value, and the last two methods return a value.
Callable and Runnable interfaces
The Runnable interface
// Classes that implement the Runnable interface will be executed by Thread, representing a basic task
public interface Runnable {
// The run method is all there is to it, which is the actual task performed
public abstract void run(a);
}
Copy the code
No return value
The run method does not return a value, and while there are other ways to do this, such as writing log files or modifying shared variables, it is error-prone and inefficient.
Cannot throw an exception
public class RunThrowExceptionDemo {
/** * Normal methods can throw exceptions in method signatures **@throws IOException
*/
public void normalMethod(a) throws IOException {
throw new IOException();
}
class RunnableImpl implements Runnable {
/** * The run method cannot throw a checked Exception unless a try catch is used */
@Override
public void run(a) {
try {
throw new IOException();
} catch(IOException e) { e.printStackTrace(); }}}}Copy the code
As you can see, the normalMethod can throw an Exception on the method signature so that the upper interface can catch the Exception and handle it, but the run method cannot throw a checked Exception for classes that implement the Runnable interface. You can only use a try catch inside a method so that the upper layer does not know about exceptions in the thread.
Design leads to
The main reason for these two defects is the Runnable interface’s design of the run method, which already specifies that the return type of the run() method is void and does not declare any exceptions to be thrown. So, when we implement and rewrite the method, we cannot change either the return value type or the description of the exception thrown, because we are syntactically prohibited from doing so when we implement the method.
Why is Runnable designed this way?
Assuming that the run() method can return a return value, or that it can throw an exception, doesn’t help because we don’t have a way to catch and handle it in the outer layer, because the classes that call the run() method (such as the Thread class and Thread pool) are directly provided by Java, not written by us. So even if it does have a return value, it’s hard to use that return value, and the Callable interface is designed to solve both of those problems.
Callable interface
public interface Callable<V> {
// Return the interface, or throw an exception
V call(a) throws Exception;
}
Copy the code
Both Callable and Runnable interfaces have only one method, that is, a thread task execution method. The difference is that the Call method returns a value and declares throws Exception.
The difference between Callable and Runnable
- Method names: Callable executes call() and Runnable executes run();
- Return value: The Callable task returns a value, while the Runnable task does not return a value.
- Throw an exception: The call() method can throw an exception, but the run() method cannot.
Callable has a Future interface, through which you can check the execution status of a task, cancel the execution of a task, and obtain the execution results of a task. These functions are not available to Runnable, Callable is more powerful than Runnable.
The Future interface
The role of the Future
In simple terms, the thread is used to achieve asynchronous effect, while also getting the return value of the child thread.
For example, when a certain operation, the operation process may be time-consuming, sometimes to check the database, or heavy calculation, such as compression, encryption, etc. In this case, if we have been waiting for the method to return, it is obviously not wise, the overall operation efficiency of the program will be greatly reduced.
We can put the calculation process into the child thread to execute, and then control the calculation process executed by the child thread through the Future, and finally obtain the calculation result. In this way, the efficiency of the whole program can be improved, which is an asynchronous idea.
The method of the Future
The Future interface has a total of 5 methods, the source code is as follows:
public interface Future<V> {
/** * An attempt is made to cancel the task. If the task is completed, cancelled, or cannot be cancelled for other reasons, the task fails. 2. If the task has already started, the mayInterruptIfRunning parameter determines whether the thread executing the task should be interrupted, which is just an attempt to terminate the task. If mayInterruptIfRunning is true, the thread executing the task is immediately interrupted and true is returned. If mayInterruptIfRunning is false, true is returned and the thread executing the task is not interrupted. * 3. After calling this method, all future isDone calls return true. * 4, if this method returns true, isCancelled returns true. * /
boolean cancel(boolean mayInterruptIfRunning);
/** * Determines whether the task was canceled, returning true if cance() was called
boolean isCancelled(a);
/** * If the task is complete, return ture * Task complete includes normal termination, exception, and cancellation of the task. Return true */ in both cases
boolean isDone(a);
/** * The thread blocks until the task completes, and returns results * CancellationException is raised if the task is canceled * InterruptedException is raised if the current thread is interrupted * When an exception occurs during the task's execution, ExecutionException */ is thrown
V get(a) throws InterruptedException, ExecutionException;
/** * The thread blocks for a certain amount of time to wait for the task to complete and returns the result of the task execution, or throws a TimeoutException */ if it times out
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
Get method (get results)
The main function of GET method is to obtain the result of task execution. The behavior of this method during execution depends on the status of Callable task. The following 7 situations may occur.
-
After the task is executed, the get method returns immediately and obtains the result of the task execution.
-
Hasn’t started task execution, such as thread pool, we put a task, the thread pool may be backlog in a lot of tasks, not when it was my turn to perform, went to get, in this case, didn’t start is equal to the task, when we go to call get, will the current thread blocks, until the task is completed the results back to back.
-
The task is being executed, but the execution process is quite long, so when I go to get, it is still in the process of execution. Calling the GET method in this case also blocks the current thread until the result is returned.
-
We throw an exception during the execution of the task, and when we call GET, we throw an ExecutionException, no matter what type of exception we throw when we execute the call method, The exceptions you get when you execute the GET method are executionExceptions.
-
The task was cancelled, and if the task is cancelled, a CancellationException will be thrown when we use the get method to get the result.
-
The task is interrupted, and if the task is interrupted by the current thread, we throw InterruptedException when we use the GET method to retrieve the result.
-
If the call method completes the task within the specified time, then get will return normally. If the call method completes the task within the specified time, then get will return normally. However, if the task has not been completed by the specified time, the GET method throws a TimeoutException indicating that a timeout has occurred.
Examples:
package com.niuh.future;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new FutureTask());
try {
Integer res = future.get(2000, TimeUnit.MILLISECONDS);
System.out.println("Future thread return value:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch(TimeoutException e) { e.printStackTrace(); }}static class FutureTask implements Callable<Integer> {
@Override
public Integer call(a) throws Exception {
Thread.sleep(new Random().nextInt(3000));
return new Random().nextInt(10); }}}Copy the code
IsDone method (check whether execution is complete)
The isDone() method, which is used to determine whether the current task is complete
package com.niuh.future;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureIsDoneDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(new FutureTask());
try {
for (int i = 0; i < 3; i++) {
Thread.sleep(1000);
System.out.println("Is the thread complete:" + future.isDone());
}
Integer res = future.get(2000, TimeUnit.MILLISECONDS);
System.out.println("Future thread return value:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch(TimeoutException e) { e.printStackTrace(); }}static class FutureTask implements Callable<Integer> {
@Override
public Integer call(a) throws Exception {
Thread.sleep(2000);
return new Random().nextInt(10); }}}Copy the code
Execution Result:
Whether the thread is complete:falseWhether the thread is complete:falseWhether the thread is complete:trueFuture thread return value:9
Copy the code
You can see that the first two isDone methods return false because the thread task has not completed. The third isDone method returns true.
Note: This method returns true to indicate completion and false to indicate not completion. However, returning true does not mean that the task was successfully executed, such as when an exception was thrown in the middle of the task. So in this case, for this isDone method, it’s actually going to return true, because for it, even though there was an exception, the task is not going to be executed in the future, it’s actually done. So when the isDone method returns true, it does not mean that the task was successfully executed, only that it completed.
Let’s modify the above example to look at the result and modify the FutureTask code as follows:
static class FutureTask implements Callable<Integer> {
@Override
public Integer call(a) throws Exception {
Thread.sleep(2000);
throw new Exception("Deliberately throwing an exception"); }}Copy the code
Execution Result:The isDone method returns true even though an exception is thrown.
This code shows that:
- The isDone method returns true even if the task throws an exception.
- Although the exception thrown is IllegalArgumentException, for GET, it will still throw an ExecutionException.
- Although the exception was thrown 2 seconds into the task execution, we didn’t actually see it until we performed get.
Cancel method (cancels execution of the task)
If you do not want to perform a task, you can use the cancel method, which occurs in one of three cases:
-
The first case is the simplest, in which the task has not yet been executed. Once cancel is called, the task is cancelled normally and will not be executed in the future, and the Cancel method returns true.
-
The second case is also simpler. If the task has completed or has been cancelled before, cancel fails and returns false. Because tasks, whether completed or cancelled, can’t be cancelled again.
-
The third case is special, that is, the task is being executed. In this case, executing the cancel method will not cancel the task directly, but will make a judgment according to the parameters we pass in. The cancel method must pass in a parameter called mayInterruptIfRunning. What does it mean?
- If true is passed in, the thread executing the task will receive an interrupt signal, and the executing task will probably have some logic to handle the interrupt and stop, which makes sense.
- If false is passed in, the running task is not interrupted, that is, the cancel has no effect and the Cancel method returns false.
Examples:
package com.niuh.future;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureCancelDemo {
static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
// When the task has not yet been executed
// demo1();
// If the task has been executed
// demo2();
// If the task is in progress
demo3();
}
private static void demo1(a) {
for (int i = 0; i < 1000; i++) {
executorService.submit(new FutureTask());
}
Future<String> future = executorService.submit(new FutureTask());
try {
boolean cancel = future.cancel(false);
System.out.println("Whether the Future task was cancelled:" + cancel);
String res = future.get(2000, TimeUnit.MILLISECONDS);
System.out.println("Future thread return value:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally{ executorService.shutdown(); }}private static void demo2(a) {
Future<String> future = executorService.submit(new FutureTask());
try {
Thread.sleep(1000);
boolean cancel = future.cancel(false);
System.out.println("Whether the Future task was cancelled:" + cancel);
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ executorService.shutdown(); }}private static void demo3(a) {
Future<String> future = executorService.submit(new FutureInterruptTask());
try {
Thread.sleep(1000);
boolean cancel = future.cancel(true);
System.out.println("Whether the Future task was cancelled:" + cancel);
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ executorService.shutdown(); }}static class FutureTask implements Callable<String> {
@Override
public String call(a) throws Exception {
return "Normal return"; }}static class FutureInterruptTask implements Callable<String> {
@Override
public String call(a) throws Exception {
while(! Thread.currentThread().isInterrupted()) { System.out.println("Loop execution");
Thread.sleep(500);
}
System.out.println("Thread interrupted");
return "Normal return"; }}}Copy the code
Here, we examine the third case (the task is in progress) where the thread stops when we set true
If the Future task is canceled:true
Copy the code
When we set false, the task is cancelled, but the thread still executes.
If the Future task is canceled:trueLoop execution loop execution loop execution......Copy the code
So how do you choose to pass in true or false?
- Passing true applies when it is explicitly known that the task can handle interrupts.
- Where does passing false apply?
- If we know explicitly that the thread cannot handle interrupts, we should pass false.
- We don’t know if this task supports cancellation (or can respond to interrupts), because in most cases the code is collaborative and we can’t be sure that this task supports interrupts, so we should pass false in that case as well.
- Once the task is running, we want it to be fully executed. In this case, false should also be passed.
It is important to note that although the example does! Thread.currentthread ().isinterrupted () specifies interruption, but this is not done through our code. Instead, after Future# Cancel (true) calls t.innterrupt (), Thread.sleep throws InterruptedException, the Thread pool executes the exception logic, and exits the current task. Sleep and interrupt have unexpected effects.
For example, if we change the FutureInterruptTask code to form while(true), the thread calling cancel(true) will still be interrupted.
static class FutureInterruptTask implements Callable<String> {
@Override
public String call(a) throws Exception {
while (true) {
System.out.println("Loop execution");
Thread.sleep(500); }}}Copy the code
IsCancelled method (to determine whether it has been cancelled)
IsCancelled method, which determines whether it has been cancelled. It is used in conjunction with the cancel method, which is relatively simple, as shown in the above example.
The relationship between Callable and Future
One advantage of the Callable interface over Runnable is that it can return results, which can be retrieved using the Future class’s GET method. Thus, the Future is equivalent to a store that stores the task results of the Callable’s Call method.
In addition, we can also determine whether a task has been completed by using the isDone method of the Future, cancel the task by using the cancel method, or obtain the result of the task within a limited time, etc. In short, the Future has rich functions.
FutureTask
The implementation class is FutureTask. JDK1.8 changes the implementation of FutureTask. JKD1.8 no longer relies on AQS for implementation, but uses a volatile variable state and CAS operation. The FutureTask structure is as follows:
Let’s look at the code implementation of FutureTask:
public class FutureTask implements RunnableFuture {... }Copy the code
As you can see, it implements an interface called RunnableFuture.
RunnableFuture interface
Let’s look at the code implementation of the RunnableFuture interface:
public interface RunnableFuture<V> extends Runnable.Future<V> {
/** * Sets this Future to the result of its computation * unless it has been cancelled. */
void run(a);
}
Copy the code
Since RunnableFuture inherits the Runnable and Future interfaces, and FutureTask implements the RunnableFuture interface, FutureTask can be executed by threads as Runnable, You can also get the return value of the Callable as a Future.
FutureTask source code analysis
Member variables
NEW -> COMPLETING -> NORMAL Return result) * NEW -> EXCEPTIONAL(COMPLETING -> EXCEPTIONAL) * NEW -> CANCELLED(task CANCELLED) No result) * NEW -> INTERRUPTED -> INTERRUPTED(Task INTERRUPTED, no result) */
private volatile int state;
private static final int NEW = 0; / / new 0
private static final int COMPLETING = 1; // Execute 1
private static final int NORMAL = 2; / / 2
private static final int EXCEPTIONAL = 3; / / 3
private static final int CANCELLED = 4; 4 / / cancelled
private static final int INTERRUPTING = 5; // Interrupt 5
private static final int INTERRUPTED = 6; // interrupt 6
/** The task to be performed */
private Callable<V> callable;
/** holds the execution result, which can be used to get the result from the get() method, or possibly to throw an exception */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread that executes the Callable; * /
private volatile Thread runner;
/** The queue of the stack structure, which is the topmost node in the stack */
private volatile WaitNode waiters;
Copy the code
In order to better analyze FutureTask implementation later, it is necessary to explain the various states.
- NEW: indicates a NEW task or task that has not been completed. This is the initial state.
- COMPLETING: Example Changing the status from NEW to COMPLETING tasks after the task has been completed or an exception has occurred during the task execution, but the task execution results or exception causes have not been saved to the Outcome field. But this state will be relatively short time, belongs to the intermediate state.
- NORMAL: The task has been executed and the outcome has been saved to the outcome field. The status changes from COMPLETING the task to NORMAL. This is a final state.
- EXCEPTIONAL: The state transitions from COMPLETING to EXCEPTIONAL after an exception occurs during task execution and the reason for the exception has been saved to the outcome field. This is a final state.
- CANCELLED: When the task has not been CANCELLED or has been CANCELLED but has not been completed, the user calls cancel(false) to cancel the task without interrupting the execution thread. In this case, the status will be changed from NEW to CANCELLED. This is a final state.
- INTERRUPTING: The status changes from NEW to INTERRUPTING when the user calls the cancel(true) method to cancel a task before it has started or when it has been executed but has not completed. This is an intermediate state.
- INTERRUPTED: Calling interrupt() interrupts the state from INTERRUPTING to INTERRUPTED after the task executes the thread. This is a final state.
It is important to note that all states with a value greater than COMPLETING the task (COMPLETING normally, abnormal, or cancelled) have completed the task.
A constructor
// Callable constructor
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// Runnable constructor
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Copy the code
Callable * * converts input parameters to RunnableAdapter by following Executors. Callable returns a value while Runnable does not.
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call(a) {
task.run();
returnresult; }}Copy the code
This is a typical adaptation model. We need to adapt Runnable to Callable. First, we need to implement the Callable interface, and then call the Runnable method in the Call method of the Callable.
The inner class
static final class WaitNode {
volatile Thread thread;
volatileWaitNode next; WaitNode() { thread = Thread.currentThread(); }}Copy the code
Run method
/** * the run method can be called either directly or by starting a new thread
public void run(a) {
// The status of the task is not created, or the task is already being executed by a thread
if(state ! = NEW || ! U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// The Callable is not empty and has been initialized
if(c ! =null && state == NEW) {
V result;
boolean ran;
try {
// Call execution
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;// Execution failed
Set values (COMPLETING) and (EXCEPTIONAL) by CAS
setException(ex);
}
// Set the return value (COMPLETING) and state value (NORMAL) using CAS(broadening).
if (ran)
// Assign result to outcomeset(result); }}finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// Set task runner to null to avoid concurrent calls to the run() method
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// The task status must be re-read to avoid unreachable (leaking) interrupts
int s = state;
// Ensure that the running task receives an interrupt instruction when cancle(ture) operates
if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}Copy the code
- The run method has no return value. By assigning a value to the outcome attribute (set(result)), the return value can be obtained from the outcome attribute during GET.
- Both FutureTask constructors are converted to Callable, so when the run method is executed, only the call method of the Callable is executed. When the c. Call () code is executed, if the input parameter is Runnable, The call path is c.call() -> runnableadapter.call () -> runnable.run ().
SetException Throwable (t) method
Exception exception exception exception exception exception exception exception exception exception exception exception exception exception
protected void setException(Throwable t) {
// Call the CAS algorithm encapsulated by the UNSAFE class and set the value
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// Wake up a thread that is blocked waiting for a return valuefinishCompletion(); }}Copy the code
The set V (V) method
Outcome (=COMPLETING) and update task status (=NORMAL)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code
FinishCompletion method
// Remove all waiting threads and signal, call done (), and clear the task callable
private void finishCompletion(a) {
// assert state > COMPLETING;
for(WaitNode q; (q = waiters) ! =null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// Loop to wake up the blocking thread until the blocking queue is empty
for (;;) {
Thread t = q.thread;
if(t ! =null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
// Until the blocking queue is empty and the loop is broken
if (next == null)
break;
q.next = null; // unlink to help gc to collect at the appropriate time
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
Copy the code
HandlePossibleCancellationInterrupt method
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
// Spin wait cancle(true) ends (interrupt ends)
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
Copy the code
Cancle method
// Cancel the task
public boolean cancel(boolean mayInterruptIfRunning) {
// Interrupt tasks in the NEW state and set state according to the parameter
if(! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// Task completed (interrupt issued or cancelled)
return false;
// Interrupt the thread
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//cancel(true)
try {
Thread t = runner;
if(t ! =null)
t.interrupt();
} finally { // final state
// Update the status using the CAS algorithm
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}}finally {
// Wake up the blocking thread
finishCompletion();
}
return true;
}
Copy the code
The get method
/** * Obtain the execution result *@throws CancellationException {@inheritDoc} * /
public V get(a) throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// If the task is not completed, the thread will be blocked until the task is completed (the result has been stored in the corresponding variable).
s = awaitDone(false.0L);
// Return the result
return report(s);
}
/** * Gets the execution result of the task@throws CancellationException {@inheritDoc} * /
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
Copy the code
AwaitDone method
/** * Awaits completion or aborts on interrupt or timeout. **@param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// Loop to wait
for (;;) {
// If the thread is interrupted, remove the waiting task
if (Thread.interrupted()) {
removeWaiter(q);
// After removing the current task, throw an interrupt exception
throw new InterruptedException();
}
// If the task is completed, the task status is returned and the current task is cleared
int s = state;
if (s > COMPLETING) {
if(q ! =null) // If the task is not empty, the thread of execution is set to null to avoid repeated execution under concurrent conditions
q.thread = null;
return s;
}
// Set the result, it will be done soon, spin wait
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); // The task quits early
// Build a new node if it is executing or has not started yet
else if (q == null)
q = new WaitNode();
// The new node is usually queued in the next loop
else if(! queued)// They are not queued, configure q.ext =waiters, and configure them to Q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// If there is a timeout limit, check whether there is a timeout
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// A timeout removes the task node from the blocking queue and returns the status
removeWaiter(q);
return state;
}
// Block the thread calling the get method with a timeout limit
LockSupport.parkNanos(this, nanos);
}
else
// Block the thread calling the GET method with no timeout limit
LockSupport.park(this); }}Copy the code
RemoveWaiter method
// Remove the task node
private void removeWaiter(WaitNode node) {
if(node ! =null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q ! =null; q = s) {
s = q.next;
if(q.thread ! =null)
pred = q;
else if(pred ! =null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if(! UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break; }}}Copy the code
Done () method
protected void done(a) {}Copy the code
The default implementation does nothing. Subclasses can be overridden and this method call completes the callback or execution. Note: You can also implement this method to determine if the task has been canceled.
The use of the Future
FutureTask can be used in scenarios where an execution result is retrieved asynchronously or a task is cancelled. By passing a Runnable or Callable task to FutureTask, calling its RUN method or putting it into a thread pool for execution, FutureTask can then get the result of the execution asynchronously externally via FutureTask’s GET method. Therefore, FutureTask is very suitable for time-consuming calculations. The main thread can retrieve the result after completing its task. In addition, FutureTask can ensure that even if the run method is called multiple times, it will only execute a Runnable or Callable task once, or cancel the execution of FutureTask with cancel, etc.
Use scenarios for FutureTask to perform multitasking computing
With FutureTask and ExecutorService, you can submit computing tasks in a multithreaded manner, and the main thread continues to perform other tasks. When the main thread needs the results of the child threads, it asynchronously obtains the results of the child threads.
package com.niuh.future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/** ** * FutureTask performs multi-task computing usage scenarios *
*/
public class FutureTaskForMultiCompute {
public static void main(String[] args) {
FutureTaskForMultiCompute inst = new FutureTaskForMultiCompute();
// Create a task set
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// Create a thread pool
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// Pass a Callable object to create a FutureTask object
FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, "" + i));
taskList.add(ft);
// Commit tasks to the thread pool, or commit all tasks at once via exec. InvokeAll (taskList);
exec.submit(ft);
}
System.out.println("After all computations are committed, the main thread moves on to other tasks!");
// Start counting results of each thread
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
try {
//FutureTask's get method blocks automatically until the result is retrieved
totalResult = totalResult + ft.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch(ExecutionException e) { e.printStackTrace(); }}// Close the thread pool
exec.shutdown();
System.out.println("The total result of the multitasking calculation is :" + totalResult);
}
private class ComputeTask implements Callable<Integer> {
private Integer result = 0;
private String taskName = "";
public ComputeTask(Integer iniResult, String taskName) {
result = iniResult;
this.taskName = taskName;
System.out.println("Generate child thread computation task:" + taskName);
}
public String getTaskName(a) {
return this.taskName;
}
@Override
public Integer call(a) throws Exception {
// TODO Auto-generated method stub
for (int i = 0; i < 100; i++) {
result = +i;
}
// Sleep for 5 seconds and observe the behavior of the main thread. The expected result is that the main thread will continue to execute until the result of retrieving the FutureTask is to wait until it completes.
Thread.sleep(5000);
System.out.println("Child thread computation task:" + taskName + "Execution complete!");
returnresult; }}}Copy the code
Execution Result:
Generate child thread computation task:0Generate child thread computation task:1Generate child thread computation task:2Generate child thread computation task:3Generate child thread computation task:4Generate child thread computation task:5Generate child thread computation task:6Generate child thread computation task:7Generate child thread computation task:8Generate child thread computation task:9After all computation tasks are committed, the main thread moves on to other tasks! Child thread computation tasks:0Execution complete! Child thread computation tasks:1Execution complete! Child thread computation tasks:3Execution complete! Child thread computation tasks:4Execution complete! Child thread computation tasks:2Execution complete! Child thread computation tasks:5Execution complete! Child thread computation tasks:7Execution complete! Child thread computation tasks:9Execution complete! Child thread computation tasks:8Execution complete! Child thread computation tasks:6Execution complete! The total result after multi-task calculation is:990
Copy the code
FutureTask ensures that tasks are executed only once in high-concurrency environments
In many high-concurrency environments, we often only need to perform certain tasks once. This use scenario FutureTask feature fits the bill. For example, suppose there is a connection pool with a key. If the key exists, the object corresponding to the key is returned directly. When the key does not exist, the connection is created. For such applications, a common approach is to use a Map object to store the mapping between keys and connection pools. Typical code is as follows:
package com.niuh.future;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
/ * * *@program: Error example *@description: In many high-concurrency environments, some tasks need to be executed only once. * This use scenario FutureTask feature is just right. For example, if you have a connection pool with a key, * if the key exists, that is, return the object corresponding to the key. When the key does not exist, the connection is created. For such scenarios, * the common approach is to use a Map object to store the mapping between keys and connection pools. Typical code is as follows * In this example, we use locking to ensure thread-safety in high-concurrency environments, and also ensure that the connection is created only once, at the expense of performance. * /
public class FutureTaskConnection1 {
private static Map<String, Connection> connectionPool = new HashMap<>();
private static ReentrantLock lock = new ReentrantLock();
public static Connection getConnection(String key) {
try {
lock.lock();
Connection connection = connectionPool.get(key);
if (connection == null) {
Connection newConnection = createConnection();
connectionPool.put(key, newConnection);
return newConnection;
}
return connection;
} finally{ lock.unlock(); }}private static Connection createConnection(a) {
try {
return DriverManager.getConnection("");
} catch (SQLException e) {
e.printStackTrace();
}
return null; }}Copy the code
In the example above, we lock to ensure thread-safety in a high-concurrency environment and ensure that the Connection is created only once, at the expense of performance. With ConcurrentHash, locking is almost eliminated and performance is greatly improved.
package com.niuh.future;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
/ * * *@description: When using ConcurrentHash, locking operation can be almost avoided and performance is greatly improved. * <p> * However, it is possible for a Connection to be created more than once in the case of high concurrency. * Why? Since creating a Connection is a time-consuming operation, suppose that multiple threads flood into the getConnection method and find that the key corresponding to the key does not exist, * all incoming threads start conn=createConnection(), Only one thread can insert a connection into the map. * However, connections created by other threads are of little value and waste system overhead. * /
public class FutureTaskConnection2 {
private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>();
public static Connection getConnection(String key) {
Connection connection = connectionPool.get(key);
if (connection == null) {
connection = createConnection();
// Determine whether a thread inserted prematurely based on the return value of putIfAbsent
Connection returnConnection = connectionPool.putIfAbsent(key, connection);
if(returnConnection ! =null) { connection = returnConnection; }}else {
return connection;
}
return connection;
}
private static Connection createConnection(a) {
try {
return DriverManager.getConnection("");
} catch (SQLException e) {
e.printStackTrace();
}
return null; }}Copy the code
However, in the case of high concurrency, it is possible to create a Connection more than once. Why is that?
Since creating a Connection is a time-consuming operation, suppose that multiple threads flood into the getConnection method and find that the key corresponding to the key does not exist, all incoming threads start conn=createConnection(), Only one thread can insert a connection into the map. However, connections created by other threads are of little value and waste system overhead.
The most important problem is creating a Connection action when the key does not exist (conn=createConnection();). Can put connectionPool. PutIfAbsent () after the implementation, this is the time that FutureTask function, based on ConcurrentHashMap and FutureTask transformation code is as follows:
package com.niuh.future;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/ * * *@description: FutureTask ensures that the task is executed only once in high-concurrency environments * the most important problem to solve is to create a Connection action when the key does not exist (conn=createConnection();). * can put connectionPool putIfAbsent () implementation, this is the time that FutureTask work, * based on ConcurrentHashMap and FutureTask transformation code is as follows: * /
public class FutureTaskConnection3 {
private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public static Connection getConnection(String key) {
FutureTask<Connection> connectionFutureTask = connectionPool.get(key);
try {
if(connectionFutureTask ! =null) {
return connectionFutureTask.get();
} else {
Callable<Connection> callable = new Callable<Connection>() {
@Override
public Connection call(a) throws Exception {
returncreateConnection(); }}; FutureTask<Connection> newTask =new FutureTask<>(callable);
FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask);
if (returnFt == null) {
connectionFutureTask = newTask;
newTask.run();
}
returnconnectionFutureTask.get(); }}catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
private static Connection createConnection(a) {
try {
return DriverManager.getConnection("");
} catch (SQLException e) {
e.printStackTrace();
}
return null; }}Copy the code
The FutureTask task completes the callback
FutureTask has a method called void done() that calls back when each thread completes the return result. Suppose you now want to implement each thread to actively perform subsequent tasks after completing task execution.
package com.niuh.future;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/** * FutureTask#done() */
@Slf4j
public class FutureTaskDemo1 {
public static void main(String[] args) throws InterruptedException {
// Mooncake producers
final Callable<Integer> productor = new Callable<Integer>() {
@Override
public Integer call(a) throws Exception {
log.info("In the making of mooncakes...");
Thread.sleep(5000);
return (Integer) new Random().nextInt(1000); }};// Mooncake consumers
Runnable customer = new Runnable() {
@Override
public void run(a) {
ExecutorService es = Executors.newCachedThreadPool();
log.info("Boss bring me a moon cake.");
for (int i = 0; i < 3; i++) {
FutureTask<Integer> futureTask = new FutureTask<Integer>(productor) {
@Override
protected void done(a) {
super.done();
try {
log.info(String.format("No. [%s] mooncake is packed.", get()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch(ExecutionException e) { e.printStackTrace(); }}}; es.submit(futureTask); }}};newThread(customer).start(); }}Copy the code
Execution Result:
11:01:37.134 [Thread-0] the INFO com. Niuh. Future. FutureTaskDemo1 - boss gave me a moon cake11:01:37.139 [pool-1-thread-1] the INFO com. Niuh. Future. FutureTaskDemo1 - moon cakes produced in...11:01:37.139 [pool-1-thread-2] the INFO com. Niuh. Future. FutureTaskDemo1 - moon cakes produced in...11:01:37.139 [pool-1-thread-3] the INFO com. Niuh. Future. FutureTaskDemo1 - moon cakes produced in...11:01:42.151 [pool-1-thread-2] the INFO com. Niuh. Future. FutureTaskDemo1 - no. [804The moon cakes are packed11:01:42.151 [pool-1-thread-3] the INFO com. Niuh. Future. FutureTaskDemo1 - no. [88The moon cakes are packed11:01:42.151 [pool-1-thread-1] the INFO com. Niuh. Future. FutureTaskDemo1 - no. [166The moon cakes are packedCopy the code
reference
- www.cnblogs.com/cxxjohnson/…
- Blog.csdn.net/danielzhou8…
PS: The above code is submitted to Github: github.com/Niuh-Study/…
PS: There is a technical exchange group (QQ group :1158819530), which is convenient for everyone to communicate with each other, continue to learn, and make progress together. If you need it, you can add it.
GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy