The introduction

When I was doing streaming media service, I came across a scene like this:

In the function of video splicing, the following serial steps are involved:

If the source file is queued (downloaded) using the for loop, 10 10M files may take 30 seconds (depending on the current bandwidth).

If you use multiple threads to download, the maximum time can be reduced to 3s. There is no serial condition for downloading individual files.

The execution of the splicing operation must ensure that all the source files are downloaded successfully, so at this time we must sense the result of the child thread and whether an exception occurs:

Abstract

  • What do we do when we want to fetch child thread results (or batch results)?
  • How are the results obtained? Why is it available? Is there an operation to capture something from the future?
  • How are exceptions caught? The main thread of the child thread is not aware of the exception. So what’s the gateway through here?
  • Today we are going to talk about usage scenarios and principles

The Future and Callable

Why Future and Callable

Let’s start with our old friend Runnable.

As we know, Runnable has the following two flaws:

  1. Cannot return value
  2. You cannot throw a Checked exception

So why is it designed this way?

If the Run method can throw an exception, who handles it? Do you execute Main for thread.start ()? This is obviously not appropriate, which is why we recommend a try inside the Run method… The catch.

What if I have to get the return value, if I have to throw an exception? This brings us to Callable and Future.

What are Future and Callable used for? What is the relationship?

Callable can be understood as an extension of Runnable:

public interface Callable<V> {
    V call(a) throws Exception;
}
Copy the code

It can either return a value or throw an exception. How does it receive its exception when it throws it? Who is receiving? How do I get the return value?

Don’t worry, look at Future.

Callable describes a computational task, and Future represents the life cycle of a task, meaning that it stores a Future value.

public interface Future<V> {

    /** * is used to cancel the task, returning true on success and false */ on failure
    boolean cancel(boolean mayInterruptIfRunning);

    /** * indicates whether the task was cancelled successfully. If the task was cancelled successfully before the task completed, true */ is returned
    boolean isCancelled(a);

    /** * indicates whether the task is complete. If the task is complete, true */ is returned
    boolean isDone(a);

    /** * is used to get the result of the execution. This method blocks and does not return */ until the task is finished
    V get(a) throws InterruptedException, ExecutionException;

    /** * blocks the execution result for a specified period of time, or returns null */ if the result has not been retrieved within a specified period of time
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
Copy the code

Here’s a scenario:

If I have a long computation task and I want a child thread to do it, the main thread will continue to do its job, and at some point, the main thread will want to know the result of the computation, and then go get it, how do I do this?

  1. We can useFuture.getIn order to getCallableThe execution result returned by the interface.
  2. Until the callable.call() method completes, the thread calling the future.get() method is blocked until the result is returned.
public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(10);
    System.out.println("Now I need a very complicated calculation, and I need a child thread to do it.");
    Future<Integer> future = service.submit(new CallableTask());
    System.out.println("I'm done, the child thread has started, I'm going to continue with my work.");
    try {
        Thread.sleep(2000);
        System.out.println("I want to know the calculation.");
        System.out.println("Obtain the calculation result:" + future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
    	// If the call method throws an exception during the calculation, an ExecutionException will be caught here
        // Call throws a null pointer exception, but this is still ExecutionException
        e.printStackTrace();
    }
    service.shutdown();
}

static class CallableTask implements Callable<Integer> {

    @Override
    public Integer call(a) throws Exception {
        System.out.println("The child thread starts a complex calculation.");
        Thread.sleep(3000);
        return newRandom().nextInt(); }}Copy the code

3. How to achieve result acquisition and exception reception?

The Future interface defines a series of methods for controlling tasks and getting the results of tasks.

Let’s look at a typical implementation, FutureTask, that implements the RunnableFuture interface, and RunnableFuture inherits Runnable and Future

public class FutureTask<V> implements RunnableFuture<V> {... }Copy the code
public interface RunnableFuture<V> extends Runnable.Future<V> {
    void run(a);
}
Copy the code

Let’s take a look at how it works:

public class FutureTaskDemo {

    public static void main(String[] args) {
        // Step 1: Create a computing task
        CallableTask callableTask = new CallableTask();
        // Step 2: Pass in a task and create a task manager that has a place to store future results and control task status
        FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
        // Step 3: Put the task into a child thread (or thread pool) to execute
        // ExecutorService service = Executors.newCachedThreadPool();
        // service.submit(future);
        new Thread(futureTask).start();
        try {
            System.out.println("Task Running Result:" + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch(ExecutionException e) { e.printStackTrace(); }}}class CallableTask implements Callable<Integer> {

    @Override
    public Integer call(a) throws Exception {
        System.out.println("Child thread is calculating");
        Thread.sleep(3000);
        return 1; }}Copy the code

1. How are Future and Callable combined?

When FutureTask is executed by a thread, the thread’s start method calls Runnable’s run. FutureTask implements the Runnable interface. Let’s see what its run method does:

public void run(a) {...try {
        Callable<V> c = callable;
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
            	// In this case, the call method of Callable is called
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                ......
            }
            if(ran) set(result); }}finally{... }}Copy the code

Remember when we were constructing FutureTask we needed to pass in a Callable? This is where the Call method of the Callable is called.

2. Where are the results from the future?

In this code, a result is used to receive the result from the call method.

Then there is a line: set(result);

Let’s keep track of where it is set:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    	// The result is assigned to the member variable outcome
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion(); }}Copy the code

So we find this member variable and say:

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
Copy the code

So when we use get(), that’s where we get the value.

How does get get block and wake up?

Let’s look at the next picture

How do I catch exceptions for child threads?

Let’s take a look at this code:

public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(20);
    Future<Integer> future = service.submit(
            () -> {
                System.out.println(Thread.currentThread().getName() + ": Child thread ready to throw exception");
                throw new IllegalArgumentException("Callable throws an exception"); });try {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + "Count to the number:" + i);
            Thread.sleep(500);
        }
        System.out.println(Thread.currentThread().getName() + "Determine whether the child thread task has completed execution:" + future.isDone());
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
        System.out.println(Thread.currentThread().getName() + "得到InterruptedException异常");
    } catch (ExecutionException e) {
        e.printStackTrace();
        System.out.println(Thread.currentThread().getName() + "得到ExecutionException异常"); }}Copy the code

Execution Result:

  1. Only the main thread executesget()Method to detect an exception thrown by the child thread
  2. Even though we’re throwing outIllegalArgumentExceptionBut still catch isExecutionException

** How is this exception set? Arunachal pradesh

Let’s go to the run() method again:

public void run(a) {...try {
        Callable<V> c = callable;
        if(c ! =null && state == NEW) {
            V result;
            boolean ran;
            try {
            	result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // Catch an exception when executing the call method
                setException(ex);
            }
            if(ran) set(result); }}finally{... }}Copy the code

The child thread itself, after catching the exception, calls setException() to set the task state to “EXCEPTIONAL” :

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion(); }}Copy the code

Then we go to the report() method called in the get() method:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
Copy the code

If the task status is “EXCEPTIONAL”, an ExecutionException is thrown

Can the task be cancelled?

The effect of cancellation can be achieved by using the cancel() method, but it is impossible to cancel it immediately if you want to. It can be divided into the following three situations:

  1. If a taskIt hasn’t started yetIn this case, the task will be cancelled normally and will not be executed in the futuretrue
  2. If a taskCompleted or cancelledthatcancel()Method fails and returnsfalse
  3. If a taskExecution has begun, then executecancel()Method does not cancel the task directly, but based on the arguments we pass inmayInterruptIfRunningJudgment:
    1. mayInterruptIfRunning = true: Sends an interrupt signal to the executing thread and sets the task state to “interrupt” (whether the thread responds to the interrupt signal is up to the thread)
    2. mayInterruptIfRunning = false: Do not send the interrupt signal, set the task status to “CANCELLED”
public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(10);
    System.out.println(Thread.currentThread().getName() + "Now I need a very complex calculation result, open a child thread to do it.");
    Future<Integer> future = service.submit(new CallableTask());
    System.out.println(Thread.currentThread().getName() + ": open finished, the child thread has started to calculate, I continue with my work");
    try {
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + ": Execute the task of canceling the child thread");
        // Adjust to true or false to observe the result
        System.out.println(Thread.currentThread().getName() + ": Cancel result:" + future.cancel(true));
        Thread.sleep(4000);
        System.out.println(Thread.currentThread().getName() + ": Ready to obtain calculation results");
        // If the cancel method has been executed, the exception is cancelled
        System.out.println("Obtain the calculation result:" + future.get());
    } catch (InterruptedException e) {
        System.out.println(Thread.currentThread().getName() + ": InterruptedException");
        e.printStackTrace();
    } catch (ExecutionException e) {
        System.out.println(Thread.currentThread().getName() + ": ExecutionException");
        e.printStackTrace();
    } catch (CancellationException e){
        System.out.println(Thread.currentThread().getName() + ": CancellationException");
        e.printStackTrace();
    }
    service.shutdown();
}

static class CallableTask implements Callable<Integer> {

    @Override
    public Integer call(a) throws Exception {
        System.out.println(Thread.currentThread().getName() + ": child thread starts complex computation");
        Thread.sleep(3000);
        System.out.println(Thread.currentThread().getName() + ": child thread terminates complex computation");
        return 0; }}Copy the code
  1. usefuture.cancel(true)(Since interrupts can be responded to during sleep, an interrupt signal is received) :

future.cancel(false)

5. How to obtain task results in batches?

The first thing we can think of is using a Future array:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = Executors.newFixedThreadPool(20);
    ArrayList<Future> futures = new ArrayList<>();
    // Loop submit 20 tasks and put the future into the list
    for (int i = 0; i < 20; i++) {
        Future<Integer> future = service.submit(new CallableTask());
        futures.add(future);
    }
    // The loop waits for the result, because the task will execute 2s, so all the results will be printed together after 3s
    for (int i = 0; i < 20; i++) {
        Future<Integer> future = futures.get(i);
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch(ExecutionException e) { e.printStackTrace(); }}}static class CallableTask implements Callable<Integer> {

    @Override
    public Integer call(a) throws Exception {
        Thread.sleep(3000);
        return newRandom().nextInt(); }}Copy the code

But think about it for a moment. What if this is the scenario:

The first task takes 1 minute to execute, and the subsequent ones take less than 10 seconds. At this point, the main thread is actually blocking and waiting for the first task to finish before getting another result.

This scenario can be resolved in two ways:

  1. Use atimeouttheget
  2. useCompletableFuture

conclusion

Today we’ll start with a scenario: what do we do when we want to get the results of child thread execution? This leads to Future and Callable.

And then we started thinking:

  • How are the results obtained? Why is it available? Is there an operation to capture something from the future?
  • How are exceptions caught? The main thread of the child thread is not aware of the exception. So what’s the gateway through here?
  • I performcancelDoes the method really cancel the task in progress?

These problems are explained from the perspective of “usage + source code”.