Java and Scala concurrency basics

Learn about concurrency in the Java language and the additional options Scala offers

Published by Dennis Sosnoski on March 25, 2014

Decades of rapid advances in processor speed ended at the turn of the century. Since then, processor makers have improved chip performance by adding cores rather than running faster. Multicore systems are now the norm for everything from mobile phones to enterprise servers, and this trend is likely to continue and accelerate. More and more developers must deal with multiple kernels in their application code to meet performance requirements.

In this series of articles, you’ll learn about new approaches to concurrent programming for the Java and Scala languages, including ideas that have been explored about how Java can be combined in Scala and other JVM-based languages. The first part provides context for the broader picture of concurrent programming on the JVM by introducing some of the most advanced technologies currently available for Java 7 and Scala. You’ll learn how to use the JavaExecutorService and ForkJoinPool classes to simplify concurrent programming. You’ll also learn about some basic Scala features that extend concurrent programming options beyond those available in pure Java. Along the way, you’ll see how different approaches affect concurrent programming performance. Subsequent sections cover concurrency improvements in Java 8, as well as extensions including the Akka toolkit for extensible Java and Scala programming.

Java concurrency support

Concurrency support has been a feature of Java since the early days of the platform, and the implementation of threading and synchronization gives it an advantage over competing languages. Scala is based on Java and runs on the JVM, providing direct access to all Java runtime state (including all concurrency support). So before I dive into Scala’s features, I’ll start with a quick review of what the Java language already offers.

About the series

With multicore systems now ubiquitous, concurrent programming must be used more widely than ever before. But concurrency can be difficult to implement correctly, and you need new tools to help you use it. Many JVA-BASED languages are developing this type of tool, and Scala is particularly active in this area. This series introduces you to some new approaches to concurrent programming for the Java and Scala languages.

Basic Java threads

Threads are easy to create and use in Java programming. They are represented by the java.lang.Thread class, and the code to be executed by the Thread exists as an instance of java.lang.Runnable. If desired, you can create thousands of threads in your application. When multiple kernels are available, the JVM uses them to execute multiple threads concurrently.

The behavior of coordinating processing threads becomes chaotic. This complication arises because the Java compiler and JVM are free to reorder operations in code as long as everything remains consistent from the program’s point of view. For example, if two addition operations use different variables, the compiler or JVM can perform these operations in the opposite order you specify, provided that the program does not use either value until the two operations are complete. The flexibility of this reorder operation helps improve Java performance, but the consistency guarantee only applies to a single thread. Hardware can also cause threading problems. Modern systems use multi-level caches, and all kernels in the system usually don’t see the same cache. When one kernel modifies a value in memory, the change may not be visible to other kernels.

Because of these problems, you must explicitly control how the threads interact when one thread processes data that has been modified by another thread. Java provides this control with special operations to establish order in views of data seen by different threads. The basic operation is that a thread accesses an object using the synchronized keyword. When a thread synchronizes on an object, that thread gains exclusive access to the lock that is unique to that object. If another thread already holds the lock, the thread that wants to acquire it must wait or block until the lock is released. When a thread executes a synchronized block in an internal recovery, Java guarantees that the thread “sees” everything written by other threads that previously held the same lock — but only the data written by those threads, until they release the lock by leaving their synchronized block. This guarantee applies both to reordering operations performed by the compiler or JVM and to hardware memory caches. Thus, the interior of a synchronized block is a stable region of code in which threads can take turns executing, interacting, and safely sharing information.

Java 5: Concurrency watershed

Java has supported threading and synchronization from the beginning. However, the initial specification for sharing data between threads was less reliable, leading to significant changes in the Java language update for Java 5 (JSR-133). The Java language specification modifies and regularizes Java 5 operations. The specification also illustrates how immutable objects work with multiple threads. (Basically, immutable objects are always thread-safe, provided that “escape” references are not allowed when executing constructors.) Previously, interactions between threads typically required the use of blocking operations. These changes are coordinated through the use of nonblocking between threads synchronized. As a result, Java 5 has added a new concurrent collection class that supports non-blocking operations — a major improvement over the earlier thread-safe, block-only approach.

The use of the volatile keyword provides a slightly less correlated form of secure interaction between threads. The synchronized keyword guarantees that when you acquire a lock, you will see the store of other threads, and that other threads will see your store after you acquire the lock. The volatile keyword breaks this guarantee into two separate parts. If a thread writes to a volatile variable, all previous writes to that point are flushed first. If a thread reads a variable, it sees not only the value that was written to that variable, but also all the other values that were written by the writing thread. So reading a volatile variable provides the same kind of memory guarantee as entering a synchronized block, and writing a volatile variable gives the same sort of memory guarantee as leaving a synchronized block. But there is one big difference: reading or writing volatile variables is never blocked.

Abstract Java concurrency

Synchronization is useful, and many multithreaded applications are developed in Java using only basic synchronized blocks. But coordinating threads can be cumbersome, especially if you’re dealing with many threads and many locks. Ensuring that threads only interact in a safe way and avoid potential deadlocks (two or more threads waiting for each other to release the lock before they can continue) becomes difficult. Abstractions that support concurrency rather than dealing directly with threads and locks give developers a better way to handle common use cases.

The java.util.Concurrent hierarchy includes changes to support concurrent access to collections, wrapper class atomic operations, and synchronization primitives. Many of these classes are designed to support non-blocking access to avoid deadlock problems and enable more efficient threading. These classes make it easier to define and modulate interactions between threads, but they still suffer from some of the complexity of the basic threading model.

A pair of abstract java.util.Concurrent in the package supports a more decoupled approach to concurrency: the Future

interface Executor and the ExecutorService interface. These related interfaces, in turn, are the basis for many of the Scala and Akka extensions supported by Java concurrency, so it’s worth examining them and their implementation in more detail.

Future

is the holder T of a type value, but this value is usually not available until some time after the Future is created. This value is the result of an asynchronous operation that may be executed concurrently. The thread receiving the Future can call the method:

  • Check whether the value is available
  • Wait for the value to become available
  • Retrieves values when available
  • If the value is no longer needed, cancel the operation

Specific implementations of Future are structured to support different ways of handling the asynchronous operation.

Specific implementation futures are constructed to support different ways of handling asynchronous operations.

An Executor is an abstraction around the things that perform a task. This “thing” will ultimately be a thread, but the details of how the thread handles execution are hidden from the interface. The Executor itself is of limited use. The ExecutorService subinterface provides an extended way to manage all standard implementations of termination and generation for Future task results. Executors also implement ExecutorService, so in practice, you can ignore the root interface.

Threads are relatively heavyweight resources, and it makes more sense to reuse them rather than allocate and discard them. ExecutorService simplifies work sharing between threads, and also supports automatic reuse of threads, simplifying programming and improving performance. The ThreadPoolExecutor implementation of ExecutorService manages a thread pool to perform tasks.

Apply Java concurrency

Practical applications of concurrency typically involve tasks that require external interactions (with users, storage, or other systems) independent of the primary processing logic. This type of application is difficult to boil down to a simple example, so for concurrent demonstrations, people often use simple computationally intensive tasks, such as mathematical calculations or sorting. I’ll use a similar example.

The task is to find the known word closest to the unknown input, where the nearest is defined as the Levenshtein distance: the minimum number of character additions, deletions, or changes required to convert the input into a known word. I use code based on the example in the Levenshtein distance article on Wikipedia to calculate the Levenshtein distance for each known word and return the best match (or an inconclusive result if multiple known words have the same distance).

Listing 1 shows the Java code to calculate the Levenshtein distance. This calculation produces a matrix with rows and columns matching the size of the two texts being compared, plus one for each dimension. To improve efficiency, this implementation uses a pair of arrays of the size of the target text to represent successive rows of the matrix, swapping the arrays on each pass because I only need the value of the previous row to compute the next row.

Listing 1. Levenshtein distance calculation in Java
/* Calculate edit distance from targetText to known word. @param word known word @param v0 int array of length targetText.length() + 1 @param v1 int array of length targetText.length() + 1 @return distance / private int editDistance(String word, int[] v0, int[] v1) { // initialize v0 (prior row of distances) as edit distance for empty 'word' for (int i = 0; i < v0.length; i++) { v0[i] = i; } // calculate updated v0 (current row distances) from the previous row v0 for (int i = 0; i < word.length(); i++) { // first element of v1 = delete (i+1) chars from target to match empty 'word' v1[0] = i + 1; // use formula to fill in the rest of the row for (int j = 0; j < targetText.length(); j++) { int cost = (word.charAt(i) == targetText.charAt(j)) ? 0:1; v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost); } // swap v1 (current row) and v0 (previous row) for next iteration int[] hold = v0; v0 = v1; v1 = hold; } // return final value representing best edit distance return v0[targetText.length()]; }Copy the code

If you have a large number of known words to compare with unknown inputs, and you are running on a multi-core system, you can use concurrency to speed up processing: break the known word set into chunks and process each chunk as a separate task. By changing the number of words in each block, you can easily change the granularity of the task decomposition to see the impact on overall performance. Listing 2 shows the Java code for chunking, taken from the ThreadPoolDistance class in the sample code. Listing 2 uses a standard that sets the thread count to the number of processors available. ExecutorService

Listing 2. Block distance calculation in multithreaded Java

private final ExecutorService threadPool; private final String[] knownWords; private final int blockSize; public ThreadPoolDistance(String[] words, int block) { threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); knownWords = words; blockSize = block; } public DistancePair bestMatch(String target) { // build a list of tasks for matching to ranges of known words List(less-thanDistanceTask(greater-than tasks = new ArrayList(less-thanDistanceTask(greater-than(); int size = 0; for (int base = 0; base (less-than knownWords.length; Base += size) {size = Math. Min (blockSize, knownWords. Length base); tasks.add(new DistanceTask(target, base, size)); } DistancePair best; try { // pass the list of tasks to the executor, getting back list of futures List(less-thanFuture(less-thanDistancePair(greater-than(greater-than results = threadPool.invokeAll(tasks); // find the best result, waiting for each future to complete best = DistancePair.WORST_CASE; for (Future(less-thanDistancePair(greater-than future: results) { DistancePair result = future.get(); best = DistancePair.best(best, result); } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } return best; } /** * Shortest distance task implementation using Callable. */ public class DistanceTask implements Callable(less-thanDistancePair(greater-than { private final String targetText; private final int startOffset; public DistanceTask(String target, int offset, int count) { targetText = target; startOffset = offset; compareCount = count; } private int editDistance(String word, int[] v0, int[] v1) { ... } /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ @Override public DistancePair call() throws Exception { // directly compare distances for comparison words in range int[] v0 = new int[targetText.length() + 1]; int[] v1 = new int[targetText.length() + 1]; int bestIndex = -1; int bestDistance = Integer.MAX_VALUE; boolean single = false; for (int i = 0; i < compareCount; i++) { int distance = editDistance(knownWords[i + startOffset], v0, v1); if (bestDistance > distance) { bestDistance = distance; bestIndex = i + startOffset; single = true; } else if (bestDistance == distance) { single = false; } } return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : new DistancePair(bestDistance); }}Copy the code

Show more

The method in bestMatch() listing 2 constructs a list of DistanceTask instances, and then passes that list to the ExecutorService. The call form of this pair of executorservices uses Collection
> Represents the type parameter of the task to be performed. The call returns a Future

that represents a list of execution results. The ExecutorService asynchronously fills these results with a method for each task by calling the returned value call(). In this case, the T type is DistancePair- a simple value object for distances and matching words, or only one distance if no unique matches occur.

BestMatch (), the original thread of execution in the method, waits for each thread Future in turn to complete, accumulating the best results and returning when it completes. Because multiple threads handle the execution of DistanceTasks, the original thread only waits for a small portion of the results. The remaining results are completed at the same time as the results awaited by the original thread.

Concurrent performance

To take advantage of the number of processors available on your system, you must configure the system with at least as many threads as the number of processors. You must also pass at least as many tasks as the number of processors to the ExecutorService for execution. In fact, for best performance, you might want to have many more tasks than the processor. As a result, the processor is busy with task after task, only idle at the end. But because of the overhead involved — creating the task and the future, switching threads between tasks, and finally returning the results of the task — you have to keep the task large enough so that the overhead is proportionally small.

Figure 1 shows how measured performance varies with different numbers of tasks when the test code is run on my four-core AMD System Using Oracle’s Java 7 for 64-bit Linux®. Each input word is compared in turn with 12,564 known words, and each task finds the best match within a range of the known words. The full set of 933 misspelled input words is run repeatedly, with pauses between passes for the JVM to settle, and the best time after 10 passes is used in the graph. As you can see from Figure 1, the performance in input words per second looks stable over a reasonable range of block sizes (basically, From 256 to > 1024), dropping only near the extremes where the tasks become either very small or very large. The final value, For block size 16384, creates only one task, so shows single-threaded performance.

Figure 1. ThreadPoolDistance performance

Fork-Join

Java 7 introduced another implementation of the ExecutorService: the ForkJoinPool class. ForkJoinPool is designed to efficiently process tasks that can be repeatedly decomposed into subtasks, using either the RecursiveAction class (when the task does not produce a result) or the RecursiveTask

class (when the task has a result of Type T) for the task. RecursiveTask

provides a convenient way to merge the results of subtasks, as shown in Listing 3.

Listing 3. RecursiveTask example
private ForkJoinPool threadPool = new ForkJoinPool(); private final String[] knownWords; private final int blockSize; public ForkJoinDistance(String[] words, int block) { knownWords = words; blockSize = block; } public DistancePair bestMatch(String target) { return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords)); } /* Shortest distance task implementation using RecursiveTask. / public class DistanceTask extends RecursiveTask<DistancePair> { private final String compareText; private final int startOffset; private final int compareCount; private final String[] matchWords; public DistanceTask(String from, int offset, int count, String[] words) { compareText = from; startOffset = offset; compareCount = count; matchWords = words; } private int editDistance(int index, int[] v0, int[] v1) { ... } / (non ‑ Javadoc) @ see Java. Util. Concurrent. RecursiveTask# compute () / @ Override protected DistancePair compute () {if (compareCount > blockSize) { // split range in half and find best result from bests in each half of range int half = compareCount / 2; DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords); t1.fork(); DistanceTask T2 = new DistanceTask(compareText, startOffset + half, compatell a series of events half, matchWords); DistancePair p2 = t2.compute(); return DistancePair.best(p2, t1.join()); } // directly compare distances for comparison words in range int[] v0 = new int[compareText.length() + 1]; int[] v1 = new int[compareText.length() + 1]; Int bestIndex = ‑ 1; int bestDistance = Integer.MAX_VALUE; boolean single = false; for (int i = 0; i < compareCount; i++) { int distance = editDistance(i + startOffset, v0, v1); if (bestDistance > distance) { bestDistance = distance; bestIndex = i + startOffset; single = true; } else if (bestDistance == distance) { single = false; } } return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : new DistancePair(bestDistance); }}Copy the code

Figure 2 shows how the performance of ForkJoin ThreadPool are compared from the listing 3 code and described from the code [2] (developer.ibm.com/articles/j-… Current# listing2). The ForkJoin code is more stable over the whole range of block sizes, dropping significantly only when you move into a single block (which means execution is single threaded). Standard ThreadPool code only shows better performance at 256 and 1,024 block sizes.

Figure 2. Comparison of ThreadPoolDistance performance with ForkJoinDistance performance

These results suggest that if you can resize tasks in your application for best performance, using a standard might result in a ThreadPool rather than ForkJoin. But understand that the “sweet spot” ThreadPool depends on the task, the number of processors available, and other aspects of the system. ForkJoin provides excellent performance with minimal adjustment in general, so it is best to use it whenever possible.

Scala Concurrency basics

Scala extends the Java programming language and runtime in a number of ways, including adding more and easier ways to handle concurrency. For starters, the Scala version Future

is much more flexible than the Java version. You can create the futures directly from the code block, and you can attach callbacks to the futures to complete processing. Listing 4 shows some examples of Scala futures in use. The code first defines methods for futureInt() to supply the Future

on demand, and then uses the futures in three different ways.

Listing 4. Scala Future sample code
import ExecutionContext.Implicits.global val lastInteger = new AtomicInteger def futureInt() = future { Thread sleep 2000 lastInteger incrementAndGet } // use callbacks for completion of futures val a1 = futureInt val a2 = futureInt a1.onSuccess { case i1 => { a2.onSuccess { case i2 => println("Sum of values is " + (i1 + i2)) } } } Thread sleep 3000 // use for construct to extract values when futures complete val b1 = futureInt val B2 = futureInt for (i1 < b1; I2 < b2) yield println("Sum of values is "+ (i1 + i2)) Thread sleep 3000 // Wait directly for completion of futures val c1 = futureInt val c2 = futureInt println("Sum of values is " + (Await.result(c1, Duration.Inf) + Await.result(c2, Duration.Inf)))Copy the code

The first example in Listing 4 attaches a callback closure to a pair of futures to print the sum of the two resulting values to the console when both are complete. Callbacks are directly nested on futures in the order in which they were created, but they work the same way if you change the order. If the future is complete when the callback is attached, the callback will still run, but there is no guarantee that it will run immediately. The original execution Thread pauses at line 3000 of Thread Sleep to allow the futures to complete before continuing with the next example.

The second example shows how Scala’s *for understands * asynchronously extracts values from futures and uses them directly in expressions. The for understanding is a Scala construct that you can use for simplicity (express business complex combinations of Map, filter, flatMap, and foreach). It is typically used for collections of all forms, but Scala futures implements the same Monadic method for accessing collection values. Thus, you can use the future as a special kind of set — a set that contains at most one value (and maybe not even that value until some point in the future). In this case, the for statement says to get the results of the futures and use those result values in the expression. Behind the scenes, this technique produces nearly the same code as the first example, but writing it in linear code yields simpler expressions that are easier to understand. As in the first example, the original execution thread is paused so that the futures can complete before proceeding to the next example.

The third example uses a blocking wait to obtain the outcome of the futures. This works in the same way as Java futures, although in the case of Scala, await.result () makes blocking wait explicit with a special method call to the maximum wait time parameter.

[4] (developer.ibm.com/articles/j-… The code in current#listing4) obviously doesn’t pass futures to anExecutorService or its equivalent, so if you haven’t used Scala, you might be wondering how the code behind future executes. The answer is just above a line list [4] (developer.ibm.com/articles/j-… Current# listing4) : import — an optional ExecutionContext. Implicits. Global. Scala apis typically use implicit parameter values that are frequently reused in code blocks. The Future {} construct requires that anExecutionContext be used as an implicit parameter. This ExecutionContext is a Scala wrapper for Java, and ExecutorService is used to perform tasks in the same way with one or more managed threads.

In addition to these basic operations of futures, Scala provides a way to transform any collection into a collection that uses parallel programming. After the collection is converted to parallel form, any standard Scala collection operations that you perform on the collection (such as map, filter, or fold) are automatically done in parallel when possible. (you’ll see an example later in this article, it is to use Scala to find the best match to the word list [7] (developer.ibm.com/articles/j-… Current# listing7) part of the code.

Error handling

Futures in Both Java and Scala must deal with error handling. In the case of Java, starting with Java 7, futures can throw anExecutionException as an alternative method to return the result. Applications can define their own subclasses of ExecutionExceptions for specific types of failures, or they can link exceptions to pass details, but this is the limit of flexibility.

Scala futures provide more flexible error handling. You have two ways to complete Scala’s future: as a successful result value (assuming one is expected), or as a failed Throwable. You can also handle future completions in a number of ways. In [listing 4,] (developer.ibm.com/articles/j-… Current# listing4) This onSuccess method is used to append callbacks to handle future successful completion. You can also use onComplete to handle any form of completion (wrap or throwableTry the result to accommodate both cases), or onFailure specifically handles error results. This flexibility of Scala futures extends to all the operations you can perform using futures, so you can integrate error handling directly into your code.

ScalaFuture

also has a closely related Promise

class. The future is the holder of an outcome that may (or may not — there is no inherent guarantee that the future will be forever complete) become available at some point. When the future is complete, the result is fixed. A promise is the other side of the same contract: a one-time assignable holder of the result, in the form of a result value or a disposable. You can get the future from the promise, and when the outcome is set on the promise, it is set on that future.

Concurrency in Scala

Now that you’re familiar with some basic Scala concurrency concepts, it’s time to look at the code for the Levenshtein distance problem. Listing 5 shows the Levenshtein distance calculation of more or less idiomatic Scala implementation, basically with [] in listing 1 (developer.ibm.com/articles/j-… Current# listing1) matches the Java code, but in a functional style.

Listing 5. Levenshtein distance calculation in Scala
val limit = targetText.length /* Calculate edit distance from targetText to known word. @param word known word @param v0  int array of length targetText.length + 1 @param v1 int array of length targetText.length + 1 @return distance */ def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { val length = word.length @tailrec def distanceByRow(rnum: Int, r0: Array[Int], r1: Array[Int]): Int = { if (rnum >= length) r0(limit) else { // first element of r1 = delete (i+1) chars from target to match empty R1 (0) = rnum + 1 // use formula to fill in the rest of the row for (j < 0 until limit) {val cost = if (word(rnum) == targetText(j)) 0 else 1 r1(j + 1) = min(r1(j) + 1, r0(j + 1) + 1, r0(j) + cost); } // recurse with arrays swapped for next row distanceByRow(rnum + 1, r1, R0)}} // Initialize v0(prior row of distances) as Edit distance for empty 'word' for (I < 0 to limit) v0(I) = I // recursively process rows matching characters in word being compared to find best distanceByRow(0, v0, v1) }Copy the code

Show more

The list [5] (developer.ibm.com/articles/j-… Current# listing5) code uses tail recursion distanceByRow() for the calculation of each line value. This method first checks the number of lines calculated and returns the resulting distance if the number matches the number of characters in the checked word. Otherwise, it evaluates the new row value and then does so by recursively calling itself to evaluate the next row (swapping two row arrays in the process so that the new current row value is passed correctly). Scala converts the tail recursive method to the equivalent method of the Javawhile loop, thus preserving the similarity to Java code.

However, there is one major difference between this code and Java code. The for connotation in [5] (developer.ibm.com/articles/j-… Current# listing5) uses closed code. Current JVMS do not always handle closures efficiently, so they add considerable overhead to the innermost loop of computation. Thus, [5] (developer.ibm.com/articles/j-… Current# listing5) code is not running as fast as the Java version. Listing 6 shows a rewrite of for that replaces the derivation with the added tail recursive method. This version is more verbose, but has comparable performance to the Java version.

Listing 6. Computational code reorganized to improve performance
val limit = targetText.length /* Calculate edit distance from targetText to known word. @param word known word @param v0  int array of length targetText.length + 1 @param v1 int array of length targetText.length + 1 @return distance */ def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = { val length = word.length @tailrec def distanceByRow(row: Int, r0: Array[Int], r1: Array[Int]): Int = { if (row >= length) r0(limit) else { // first element of v1 = delete (i+1) chars from target to match empty 'word' r1(0) = row + 1 // use formula recursively to fill in the rest of the row @tailrec def distanceByColumn(col: Int): Unit = { if (col < limit) { val cost = if (word(row) == targetText(col)) 0 else 1 r1(col + 1) = min(r1(col) + 1, r0(col + 1) + 1, r0(col) + cost) distanceByColumn(col + 1) } } distanceByColumn(0) // recurse with arrays swapped for next row distanceByRow(row + 1, r1, r0) } } // initialize v0 (prior row of distances) as edit distance for empty 'word' @tailrec def initArray(index: Int): Unit = { if (index <= limit) { v0(index) = index initArray(index + 1) } } initArray(0) // recursively process rows matching characters in word being compared to find best distanceByRow(0, v0, v1) }Copy the code

Listing 7 shows Scala code to perform the same sort of blocked distance calculation as in the [Listing 2] (developer.ibm.com/articles/j-… current#listing2) Java code. The bestMatch() method finds the best match for the target text within a particular block of words handled by the Matcher class instance, using the tail-recursive best() method to scan through the words. The *Distance classes create multiple Matcher instances, one for each block of words, then coordinate the execution and combination of the matcher results.

Listing 7. Multithreaded single-block distance calculation in Scala
class Matcher(words: Array[String]) {

  def bestMatch(targetText: String) = {

    val limit = targetText.length
    val v0 = new ArrayInt
    val v1 = new ArrayInt
    
    def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {
      ...
    }

    @tailrec
    / Scan all known words in range to find best match.
        
       @param index next word index
       @param bestDist minimum distance found so far
       @param bestMatch unique word at minimum distance, or None if not unique
       @return best match
      /
    def best(index: Int, bestDist: Int, bestMatch: Option[String]): DistancePair =
      if (index < words.length) {
        val newDist = editDistance(words(index), v0, v1)
        val next = index + 1
        if (newDist < bestDist) best(next, newDist, Some(words(index)))
        else if (newDist == bestDist) best(next, bestDist, None)
        else best(next, bestDist, bestMatch)
      } else DistancePair(bestDist, bestMatch)

    best(0, Int.MaxValue, None)
  }
}

class ParallelCollectionDistance(words: Array[String], size: Int) extends TimingTestBase {

  val matchers = words.grouped(size).map(l => new Matcher(l)).toList
  
  def shutdown = {}
  
  def blockSize = size

  / Find best result across all matchers, using parallel collection. /
  def bestMatch(target: String) = {
    matchers.par.map(m => m.bestMatch(target)).
      foldLeft(DistancePair.worstMatch)((a, m) => DistancePair.best(a, m))
  }
}

class DirectBlockingDistance(words: Array[String], size: Int) extends TimingTestBase {

  val matchers = words.grouped(size).map(l => new Matcher(l)).toList
  
  def shutdown = {}
  
  def blockSize = size

  /** Find best result across all matchers, using direct blocking waits. /
  def bestMatch(target: String) = {
    import ExecutionContext.Implicits.global
    val futures = matchers.map(m => future { m.bestMatch(target) })
    futures.foldLeft(DistancePair.worstMatch)((a, v) =>
      DistancePair.best(a, Await.result(v, Duration.Inf)))
  }
}
Copy the code

* The two classes in Distance Listing 7 show different ways to coordinate execution and the combination of Matcher results. ParallelCollectionDistance using Scala parallel collection characteristics mentioned above to hide the details of parallel computing, only need a simple foldLeft combination results.

DirectBlockingDistance to be more explicit, creates a futures list, and foldLeft waits on that list with nested blocks for each individual result.

Performance, one more time

Both of the [7] Listing (developer.ibm.com/articles/j-… Current# listing7)*Distance Implementations are reasonable approaches to handling the Matcher results. (And they’re far from the only reasonable approaches. The sample code includes a couple of other implementations I tried in my Experimentations but don’t include in the article.) In this case, performance is a main concern, so Figure 3 shows how these two implementations perform relative to the Java ForkJoin code.

Figure 3. ForkJoinDistance performance compared to the Scala alternative

Figure 3 shows that the JavaForkJoin code performs better overall than any Scala implementation, although DirectBlockingDistance provides better performance at 1,024 block sizes. In most of the block size range, two types of Scala implementations provide than list [1] (developer.ibm.com/articles/j-… Current# listing1)ThreadPool code for better performance.

These performance results are illustrative, not definitive. If you run timing tests on your own system, you may see a difference in relative performance, especially if you use different numbers of kernels. If I wanted the best performance of the distance task, I would implement optimizations: I could sort known words by length and start comparing them with words of the same length as the input (because the edit distance is always at least a big word length difference). Or, WHEN it exceeds the best prior value, I can use the early output of the distance calculation. But as a relatively simple algorithm, this experiment is a good example of how concurrent operations improve performance and the impact of different approaches sharing work.

In addition to performance, will list [7] (developer.ibm.com/articles/j-… Current# listing7) two versions of Scala control code and Java code [2] (developer.ibm.com/articles/j-… Current# listing2) and [3] (developer.ibm.com/articles/j-… Current# listing3) is interesting to compare. Scala code is significantly shorter than Java code and (assuming you know Scala!) More clearly. Scala and Java interoperate nicely, as you can see in the full sample code for this article: Scala code runs timed tests of Scala and Java code, which in turn works directly with parts of Scala code. Thanks to this simple interoperability, you can introduce Scala into your existing Java code base without extensive transformation. It often makes sense to initially use Scala for high-level control of Java code so that you can take full advantage of Scala’s expressive power without any significant performance impact due to closures or transformations.

[7] (developer.ibm.com/articles/j-… Current# listing7) ParallelCollectionDistance Scala code simplicity particularly attractive. By using this approach, you can abstract concurrency completely out of your code, so you can write programs that look like single-threaded applications and still get the benefits of multiple processors. Fortunately, for those who like the simplicity of this approach but may not be willing or able to jump into Scala development, Java 8 brings similar features to direct Java programming.

In the future

Now that you know the basics of concurrent operations in Java and Scala, the next article in this series will look at how Java 8 can improve concurrency support for Java (and, in the long run, Scala, too). Many of the changes in Java 8 look familiar — Java 8 includes many of the same concepts used in Scala’s concurrency features — so you’ll soon be able to use some of Scala’s techniques in plain Java code. Read the next section to learn how.