Java 8 Concurrency Tutorial: Threads and Executors

The Concurrency API was first introduced in Java 5 and has been refined and improved in subsequent releases. Most of the concepts in this article also apply to older versions. My code examples focus primarily on Java 8 and make extensive use of lambda expressions and some new features. If you are not familiar with lambda expressions, you are advised to read the Java 8 Tutorial first.

ThreadsRunnables

All modern operating systems support concurrency through processes and threads. Processes are usually instances of programs that run independently of each other. For example, if you start a Java program, the operating system creates a new process to run in parallel with other programs. Threads can be used to execute code simultaneously in these processes. So we can make full use of the CPU.

Java has supported threads since JDK 1.0. Before starting a new thread, you must first specify the code to run, usually called Task. Here is an example of starting a new thread by implementing the Runnable interface:

Runnable task = () -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
};

task.run();

Thread thread = new Thread(task);
thread.start();

System.out.println("Done!");
Copy the code

Since Runnable is a functional interface, we can use lambda expressions to print the name of the thread to the console. We execute Runnable directly on the main thread and start a new thread. On the console you should see something like this:

Hello main
Hello Thread-0
Done!
Copy the code

Or:

Hello main
Done!
Hello Thread-0
Copy the code

Because it is executed concurrently, we cannot predict whether Runnable will be called before or after printing Done, and the order is not indeterminate, making concurrent programming a complex task in large-scale application development.

Threads can also sleep for a period of time, as in the following example:

Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); }}; Thread thread = new Thread(runnable); thread.start();Copy the code

Executing the above code pauses for one second between print statements. TimeUnit is an enumeration of time units, or it can be implemented by calling Thread.sleep(1000).

Using the Thread class can be tedious and error-prone. For this reason, the Concurrency API was introduced in the Java 5 release in 2004. The API is located under the java.util.concurrent package and contains a number of useful classes for concurrent programming. Since then, every new release of Java has added concurrency apis, and Java 8 provides new classes and methods to handle concurrency.

Now let’s take a closer look at the most important part of the Concurrency API – Executor Services.

Executors

The Concurrency API introduces the concept of ExecutorService as a high-level alternative to Threads for handling Threads. Executors execute tasks asynchronously and usually manage a thread pool. Instead of manually creating threads, all threads in the thread pool will be reused. This allows you to run as many concurrent tasks as possible throughout the application life cycle of an Executor Service.

Below is a simple executors example:

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1
Copy the code

The Exector class provides convenient factory methods for creating different types of Executor services. In this example, an executor that executes only one thread is used.

The result looks similar to the example above, but you’ll notice one important difference: A Java process never stops, and the executor must explicitly stop it, otherwise it keeps accepting new tasks.

ExecutorService provides two ways to do this: shutdown() waits for the current task to complete, and shutdownNow() interrupts all ongoing tasks and immediately closes the execution program. No more tasks can be submitted to the thread pool after shudown.

Here’s my preferred way to close an application:

try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if(! executor.isTerminated()) { System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}
Copy the code

The executor calls shutdown to shutdown the executor and, after waiting five seconds, calls shutdownNow to interrupt the executing task, regardless of whether the task has completed or not.

Callables and Futures

Executors also support Callable tasks. Like Runnable, it is a functional interface but returns a value.

Here is a Callable defined using a lambda expression that returns an integer value after 1 second of sleep.

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e); }};Copy the code

Like Runnable, Callable can be submitted to Executor Services, but what is the result of execution? Because Submit () does not wait for the task to complete, the Executor Service cannot directly return the result of the call. Instead, it returns a result of type Future, which can be used to retrieve the actual execution result.

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
Copy the code

After the Callable is submitted to the executor, isDone() is first used to check whether the Future has completed execution. I’m sure this is not the case because the above call sleeps for a second before returning the integer.

Calling the method get() blocks the current thread until the callable completes and returns a result. Now the future completes and prints the following result on the console:

future done? false
future done? true
result: 123
Copy the code

Futures are closely tied to Executor Services, and each Future throws an exception if the Executor Service is turned off.

executor.shutdownNow();
future.get();
Copy the code

Executors are created differently here than in the previous example, where newFixedThreadPool(1) is used to create a thread pool of one thread to support executors, which is equivalent to newSingleThreadExecutor(), Later we will increase the size of the thread pool by passing a value greater than 1.

Timeouts

Any call to future.get() blocks and waits for the Callable to terminate. In the worst case, a callable function will run forever, making the application unresponsive. These situations can be offset simply by timeouts:

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e); }}); future.get(1, TimeUnit.SECONDS);
Copy the code

Executing the above code throws a TimeoutException

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
Copy the code

The maximum wait time of 1 second is specified, but the callable actually takes 2 seconds before the result is returned.

InvokeAll

Executors Support batch submission of multiple Callables through invokeAll(). This method takes an argument of type Callable and returns a List of type Future.

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1", () - >"task2", () - >"task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println);
Copy the code

In this example, we use the Java 8 stream to process all the futures returned by the invokeAll call. We first map the return value for each Future, and then print each value to the console. If you’re not already familiar with streams, read the Java 8 Stream Tutorial.

InvokeAny

Another method that can be invoked for bulk submission is invokeAny(), which is slightly different from invokeAll(). This method does not return all Future objects; it only returns the result of the first completed task.

Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}
Copy the code

We used this method to create a Callable with three different sleep times. These callable objects are submitted to executor via invokeAny(), which returns the soonest completion result, in which case task2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("task1".2),
    callable("task2".1),
    callable("task3".3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => task2
Copy the code

The example above uses another type of executor created through newWorkStealingPool(). This factory method is part of Java 8 and returns a ForkJoinPool executor of type, which is slightly different from normal executors. It does not use a fixed size thread pool, which by default is the number of cores available on the host CPU.

Scheduled Executors

We’ve learned how to submit and run tasks on Executors. To run tasks regularly multiple times, we can use scheduled Thread pools.

ScheduledExecutorService can schedule tasks to run periodically or once after a period of time.

The following code example shows a task running after three seconds:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: "+ System.nanoTime()); ScheduledFuture<? > future = executor.schedule(task,3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
Copy the code

The scheduled task produces a value of type ScheduledFuture, which, in addition to Future, provides the getDelay() method to retrieve the remaining time for the task to execute.

For scheduled tasks, executor provides two methods scheduleAtFixedRate() and scheduleWithFixedDelay(). The first method is capable of performing tasks with fixed time intervals, for example, once per second:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
Copy the code

In addition, this method can set the delay time, which describes the wait time before the task is first executed.

The scheduleWithFixedDelay() method is slightly different from scheduleAtFixedRate() in their wait times, The scheduleWithFixedDelay() wait time is imposed between the end of the previous task and the start of the next.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted"); }}; executor.scheduleWithFixedDelay(task,0.1, TimeUnit.SECONDS);
Copy the code

In this example, the delay is 1 second between the end of the execution and the start of the next execution. The initial delay is 0 and the task duration is 2 seconds. So we end up with an interval of 0s, 3s, 6s, 9s, etc.