Java 8 has a ton of new features and enhancements like Lambda expressions, Streams, CompletableFuture, and more. In this article I’ll explain CompletableFuture and the use of all its methods in detail.

What is CompletableFuture?

In Java, CompletableFuture is used for asynchronous programming. Asynchronous programming is writing non-blocking code that runs a task on a separate thread, isolated from the main thread, and notifythe main thread of its progress, success or failure.

In this way, the main thread is not blocked and does not have to wait until the child thread completes. The main thread can perform other tasks in parallel.

Using this parallel mode, can greatly improve the performance of the program.

Future vs CompletableFuture

CompletableFuture is an extension of the Future API.

The Future is used as a reference to the result of an asynchronous calculation. Provide an isDone() method to check that the computation is complete. The get() method is used to receive the results of the computed task when it completes.

Learn more about Futures from Callbale and the Future tutorial.

The Future API is a great progression to Java asynchronous programming, but it lacks some very important and useful features.

Limitations of Future

  1. Cannot be done manually when you write a function to get the latest price of an e-commerce product via a remote API. Because the API is too time consuming, you allow it to be in a separate thread and return a Future from your function. Now suppose that the API service is down and you want to manually complete the Future with the product’s latest cache price. You will find it impossible to do so.
  2. The Future doesn’t tell you that it’s done, it provides a blockingget()Methods inform you of the results. You can’t put a callback into a Future that automatically calls the Future’s results when they are available.
  3. Sometimes you need to perform a long running computation, and when the computation is complete, you need to send the result of its computation to another long running computation, etc. You will find that you cannot create such a workflow using the Future.
  4. Let’s say you have 10 different futures, and you want to run them in parallel, and then run some functions while they’re running. You’ll find that you can’t do that with Future either.
  5. There are so many limitations to the exception handling structure of the non-task API, but luckily we have CompletableFuture, and you can use CompletableFuture for all of these purposes.

CompletableFuture implements the Future and CompletionStage interfaces, and provides a number of convenient methods for creating, chain calling, and composing multiple Futures, as well as extensive exception handling support.

Create CompletableFuture

1. A simple example could simply create CompletableFuture using the following no-argument constructor:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();
Copy the code

This is the simplest CompletableFuture. To get the result of CompletableFuture, use the completableFuture.get () method:

String result = completableFuture.get()
Copy the code

The get() method blocks until the Future completes. Therefore, the above call will be blocked forever because the Future will never complete.

You can use CompletableFuture.com plete () by hand to complete a Future:

completableFuture.complete("Future's Result")
Copy the code

All waiting for the Future of the client will receive a specified result, and completableFuture.com plete () after the call is ignored.

2. Use the runAsync () running asynchronous computation If you want to asynchronous running a background task and don’t want to change things return quest, then you can use CompletableFuture. RunAsync () method, it holds a Runnable object, And returns CompletableFuture

.

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread."); }}); // Block andwait for the future to complete
future.get()
Copy the code

You can also pass in a Runnable object as a lambda expression:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});
Copy the code

I’ll use lambda expressions a lot in this article, and if you haven’t used them before, I suggest you use them as well.

3. Use the supplyAsync () to run an asynchronous task and returns the result When the task does not need to return anything, CompletableFuture. RunAsync () is very useful. But what if your background task needs to return some results?

CompletableFuture. SupplyAsync () is your choice. It holds supplier

and returns CompletableFuture

, where T is the type of value obtained by calling the incoming supplier.

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation"; }}); // Block and get the result of the Future String result = future.get(); System.out.println(result);Copy the code

Supplier

is a simple functional interface that represents the result of Supplier. It has a get() method that writes to your background tasks and returns results.

You can use lambda expressions to make the above example more concise:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
Copy the code

A note about executors and Thread pools you might want to know that we know that the runAsync() and supplyAsync() methods perform their tasks in separate threads. But we’re not going to create just one thread forever. The CompletableFuture can obtain a thread from the global ForkJoinPool.commonPool() to perform these tasks. But you can also create a thread pool and pass it to the runAsync() and supplyAsync() methods to let them get a thread from the pool to perform their task. All methods of the CompletableFuture API have two variants – one that takes an Executor as an argument and one that doesn’t:

// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Copy the code

Create a thread pool and pass it to one of the methods:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);
Copy the code

The CompletableFuture converts and runs

The CompletableFuture.get() method is blocked. It will wait until the Future completes and return the result when it completes. But is that what we want? To build an asynchronous system, we should attach a callback to the CompletableFuture that automatically retrieves the result when the Future is complete. If we don’t want to wait for the result to return, we can write the logic that needs to wait for the Future to complete execution into the callback function.

You can attach a callback to the CompletableFuture using thenApply(), thenAccept(), and thenRun() methods.

1. ThenApply () can be used to process and change the results of CompletableFuture. Take a Function

as an argument. Function

is a simple functional interface that takes a parameter of type T and produces a result of type R.
,t>
,t>

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev
Copy the code

You can also write a continuous transformation in the CompletableFuture by attaching a series of thenApply() in the callback method. In this case, one of the resulting thenApply methods is passed to another thenApply method in the series.

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog
Copy the code

2. ThenAccept () and thenRun() If you don’t want to return anything from your callback function and just want to run some code snippet after the Future completes, you can use thenAccept() and thenRun() methods, These methods are often used in the last callback function at the end of the call chain. CompletableFuture. ThenAccept () to hold a Consumer < T >, return a CompletableFuture < Void >. It can access the results of CompletableFuture:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
	return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
	System.out.println("Got product detail from remote service " + product.getName())
});
Copy the code

While thenAccept() can access the results of the CompletableFuture, thenRun() can’t access the results of the Future, and it holds a Runnable that returns the CompletableFuture:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});
Copy the code

All the call-back methods provided by CompletableFuture will have two variants: // thenApply(variants CompletableFuture thenApply(Function
fn) CompletableFuture thenApplyAsync(Function
fn) CompletableFuture thenApplyAsync(Function
fn, Executor Executor) These asynchronous callback variants help you further perform parallel computation by performing callback tasks in separate threads. Examples:

CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})
Copy the code

In the example above, the task in thenApply() and the task in supplyAsync() execute in the same thread. Any supplyAsync() executes immediately, that is, in the main thread (try deleting it under the sleep test). To control the thread that performs the callback task, you can use asynchronous callbacks. If you use thenApplyAsync() callback, a different thread will be executed from ForkJoinPool.commonPool().

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})
Copy the code

In addition, if you pass an Executor into thenApplyAsync() callback, the task will fetch a thread from the Executor thread pool to execute.

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);
Copy the code

Combine two CompletableFutures

1. Use thenCompose() to combine two separate futures. Suppose you want to get a user’s details from a remote API, and once the user’s details are available, you want to get his credits from another service. Consider the implementation of getUserDetail() and getCreditRating() :

CompletableFuture<User> getUsersDetail(String userId) {
	return CompletableFuture.supplyAsync(() -> {
		UserService.getUserDetails(userId);
	});	
}

CompletableFuture<Double> getCreditRating(User user) {
	return CompletableFuture.supplyAsync(() -> {
		CreditRatingService.getCreditRating(user);
	});
}
Copy the code

Now let’s figure out if thenApply() achieves the desired result –

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));
Copy the code

In the earlier example, the Supplier function passing thenApply returns a simple value, but in this case, a CompletableFuture is returned. The end result of the above example is a nested CompletableFuture. If you want to get the final result for the topmost future, use thenCompose() instead of –

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));
Copy the code

So the rule is – if your callback returns a CompletableFuture, but you want to get a direct merge result from the CompletableFuture chain, you can use thenCompose().

2. ThenCombine () although thenCompose() is used to combine two independent futures when one future depends on another. ThenCombine () is used to do something when both independent futures are completed.

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());
Copy the code

When both futures are complete, the callback passed to thenCombine() will be called.

Combine multiple CompletableFutures

We combine two CompletableFutures together using thenCompose() and thenCombine(). Now if you want to combine any number of completableFutures, what do you do? We can combine any number of CompletableFutures using the following two methods.

static CompletableFuture<Void> allOf(CompletableFuture<? >... cfs) static CompletableFuture<Object> anyOf(CompletableFuture<? >... cfs)Copy the code

1. Completablefuture.allof () CompletableFuture.allof is used when you have a list of individual futures and you want to do something in parallel after they are all completed.

Suppose you want to download 100 different pages of a website. You can do this sequentially, but it’s very time consuming. So you want to write a function that passes in a page link and returns a CompletableFuture that asynchronously downloads the content of the page.

CompletableFuture<String> downloadWebPage(String pageLink) {
	return CompletableFuture.supplyAsync(() -> {
		// Code to download and return the web page's content }); }Copy the code

Now, when all the pages have been downloaded, you want to count the number of pages that contain the keyword CompletableFuture. You can do this using CompletableFuture.allof ().

List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links // Download contents of all the web pages asynchronously List<CompletableFuture<String>>  pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );Copy the code

The problem with using CompletableFuture.allof () is that it returns CompletableFuture. But we can get all the wrapped CompletableFuture results by writing some extra code.

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});
Copy the code

Take a moment to understand the code snippet above. When all futures are done, we call future.join(), so we don’t block anywhere.

The join() method is very similar to the get() method, with the only difference being that if the top-level CompletableFuture completes with an exception, it throws an unchecked exception.

Now let’s count the number of pages that contain keywords.

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());
Copy the code

2. CompletableFuture.anyOf()

Completablefuture.anyof (), as its name suggests, returns a new CompletableFuture when any CompletableFuture is complete [same result type]. Examples:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2
Copy the code

In the example above, anyOfFuture completes when any of the three CompletableFuture completes. Because Future2 has the least sleep time, she finishes first, and the final result will be future2’s result.

Completablefuture.anyof () passes in a Future mutable argument and returns CompletableFuture. The problem with compleTableFuture.anyof () is that if your CompletableFuture returns a result of a different type, you don’t know what type you end up with CompletableFuture.

CompletableFuture Exception handling

We explored how to create CompletableFutures, transform them, and combine multiple CompletableFutures. Now let’s figure out what to do when something goes wrong.

First let’s understand how errors are passed in a callback chain. Consider the following callback chain:

CompletableFuture.supplyAsync(() -> {
	// Code which might throw an exception
	return "Some result";
}).thenApply(result -> {
	return "processed result";
}).thenApply(result -> {
	return "result after further processing";
}).thenAccept(result -> {
	// do something with the final result
});
Copy the code

If an error occurs in the original supplyAsync() task, no thenApply will be called and the future will end with an exception. If an error occurs on the first thenApply, the second and third will not be called, and the future will end with an exception.

The exceptionally() callback gives you a chance to recover from errors generated in the original Future. You can log the exception here and return a default value.

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 
Copy the code

2. Using the handle() method to handle exceptions THE API provides a more generic method – handle() to recover from exceptions, which is called whether an exception has occurred or not.

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex ! = null) { System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());
Copy the code

If an exception occurs, the RES argument will be null; otherwise, ex will be null.