With the popularity of multi-core processors and parallel tasks, people pay more and more attention to asynchronous programming. The Scala standard library provides futures that allow you to use map, filter, and other collection operations to get asynchronous results after the next transformation before you get the actual execution results.
Instead of blocking and waiting for the results of each step, we use futures to quickly construct an asynchronous pipeline of operations on a series of immutable results.
As early as the author introduced Akka, Play and other practical Demo, we have been exposed to the Future. In this topic, we will cover in detail how to use it correctly.
Bumpy thread synchronization control
Java associates each object with a logical monitor to control multithreaded access to data. Using this model, we determine which data can be shared by multiple threads, using the synchronized keyword.
Creating a robust multithreaded application using the lock model is actually very difficult. For every piece of data that needs to be shared, we need to lock it and make sure it doesn’t cause deadlock problems. However, even if we actively lock the data, they are not fixed at compile time, and at run time, the program can still create new locks at will.
Later, Java provided the java.util.Concurrent package to provide a higher level of abstract synchronization, at least faster than it would be to manually implement unstable synchronization mechanisms through various synchronization syntax-ons yourself. However, such toolkits are still based on shared data and locks, and therefore do not inherently address the difficulties of such models.
Create the first Future in the Scala program
Scala Future considerably reduces the programmer’s burden of handling and locking shared data. If the result of a function’s Execution returns a Future, it means that another computation will be returned to be executed asynchronously, and the Execution Context provided by Scala will determine which thread will handle this later asynchronous computation.
Therefore, before using a Future for asynchronous programming, you first need to import the execution context:
import scala.concurrent.ExecutionContext.Implicits.global
Copy the code
This is almost mandatory; otherwise, the program will report an error at compile time. We create the first “Future Plan” using the Apply method provided by the Future companion object:
val future = Future {
// In this plan, the executing thread will first sleep for 3 seconds and then return an Int.
Thread.sleep(3000)
200
}
Copy the code
There are two ways to determine if this asynchronous computation has produced a result:
- call
future.isCompleted
Returns if the asynchronous computation has not completedfalse
。 - call
future.value
If the calculation is complete, returnSome(Sussess(value))
Otherwise returnNone
。
Why is the value method double-wrapped? First, you need to consider whether the asynchronous computation has completed. So the outermost layer returns an Option type, or None if it evaluates.
In addition, the calculation results also include two cases. If there are no errors during the calculation, the result of the calculation can be loaded into the Success class and returned. Conversely, calling value returns a Failure.
Try type
Success and Failure belong to the Try class and represent two possible outcomes of two asynchronous operations. Its purpose is to provide a try… The catch effect allows the programmer to handle cases where a Failure is returned.
In asynchronous programming, try/catch statements are no longer valid, because Future computations are often executed in other threads, so the exception cannot be caught in the original thread. This is where the uppercase Try type comes in: If a Failure is thrown by an asynchronous computation, something unexpected happened during the computation.
Stream the Future
The Future of an asynchronous computation can be connected to another asynchronous computation by map, filter, etc. Such as:
// The import execution context will no longer be reminded in subsequent code side.
import scala.concurrent.ExecutionContext.Implicits.global
val future1 : Future[Int] = Future {
// In this plan, the executing thread will first sleep for 3 seconds and then return an Int.
Thread.sleep(3000)
println("Executive future1")
4
}
val future2: Future[Int] = future1.map(p => {
// Perform this calculation after two seconds.
Thread.sleep(2000)
println("Executive future2")
p + 5
})
Copy the code
The first asynchronous computation executes after 3 seconds and returns an Int value type. In an ideal world, when the first asynchronous computation is finished, its next step would be to add the value it just returned. This process is again named Future2. Obviously, its computed return value is still of type Future.
For the main thread, it will get a result after about 5 seconds: 9.
Use a for expression to transform a Future
Scala’s for expression capabilities are much more powerful than Java’s, including the ability to compose Future computed events.
Based on the asynchronous operations future1 and future2 above, we create a third operation, future3, to add the results of the previous two operations. The code listing is as follows:
// The import execution context will no longer be reminded in subsequent code side.
import scala.concurrent.ExecutionContext.Implicits.global
// Observe the start time
println(new java.util.Date())
val future1 : Future[Int] = Future {
// In this plan, the executing thread will first sleep for 3 seconds and then return an Int.
Thread.sleep(3000)
println("Executive future1")
4
}
val future2 : Future[Int] = Future {
Thread.sleep(2000)
println("Executive future2")
5
}
val future3: Future[Int] = for {
x <- future1
y <- future2
} yield {
x + y
}
// We use "Await" to wait for the result to complete, with no limit on the wait time.
println(Await.result(future3,Duration.Inf))
// Observe the end time
println(new java.util.Date())
Copy the code
The for loop underneath actually converts this code to a serialized flatMap sentence: future1.faltmap (x => future2.map(y => x + y)). From the time the main program completes to the time it completes the calculation, it takes a total of 3 seconds (instead of 5 seconds) because the above code is executed in an asynchronous environment.
We can draw a simple PETRI diagram and figure out the shortest time to complete the diagram (see discrete Mathematics: Critical Paths for details).
Note that if you use a for expression to transform a Future, you must declare the Future before the for loop, otherwise the for expression will complete them in a serial environment.
Create Success, Failure
Future provides many completed Future factory methods: successful, failed and fromTry. These methods do not need to manually import the context.
Use the successful method to create a future that has been completed:
val future: Future[Int] = Future.successful({
println("Return a completed Success[T]")
100
})
// Some(Success(100))
println(future.value)
Copy the code
Use failed to create a future that is finished but has an exception:
val future: Future[Nothing] = Future.failed({
println("This method returns a Failure[T]")
new Exception("Oops!")})//Some(Failure(java.lang.Exception: Oops!) )
println(future.value)
Copy the code
If you are not sure which case of a Try[+T] is thrown, then call fromTry:
val future: Future[Nothing] = Future.fromTry({
println("May return Success or Failure")
// Success(100)
Failure(new Exception("Oops!"))
})
println(future.value)
Copy the code
Two ways to wait
Await Await
Await, mentioned earlier in this article, is a synchronous wait mechanism where the main thread waits for a Future for a finite amount of time.
We introduce another package: scala. Concurrent. Duration. _, this allows 2 second we use this way to show our biggest waiting time (the author has introduced how to implement in the implicit conversion section).
Await has two main methods. The first use is to call result and the main thread to block and wait until the return value of the Future is obtained.
val intFuture = Future {
println("Working on...")
println("The thread that performs this computation is:" + Thread.currentThread().getName)
Thread.sleep(1000)
30
}
// The main program waits for the result within 3 seconds and assigns a value.
val int : Int = Await.result(intFuture,3 second)
println(int)
Copy the code
The ready method is usually used when you need to get the return value of the future before you can proceed. If you only care about the completion status of the future, you can call the ready method. The main thread waits up to 3 seconds while the Future is still working.
Await.ready(intFuture, 3 second)
Copy the code
In addition, as can be seen by thread.currentThread ().getName, the Future is executed by another Thread: ForkJoinPool -x-worker-xx.
OnComplete asynchronous wait
Advice: If you are already in Future space, try not to block Future execution with Await. Scala provides a way to register “callbacks” that allow you to get the value that a future will return in the future through function side effects.
val intFuture = Future {
println("Working on...")
println("The thread that performs this computation is:" + Thread.currentThread().getName)
Thread.sleep(1000)
30
}
// Await.ready(intFuture, 3 second)
// If the main thread does not block for a while, the program will terminate prematurely.
Thread.sleep(3000)
var intValue : Int = 0
intFuture onComplete {
case Success(value) =>
println(value)
// Return the value of the Future from the side effect of the code block.
intValue = value
case _ => println("There was an unexpected error.")}Copy the code
This method does not block the main Thread. In order to see the results of the program, we need to actively call thread. sleep to let the main Thread sleep for a while, otherwise the program will end immediately. The return value of onComplete is a Unit data type.
Use andThen to enforce the order in which futures are executed
A Future can bind more than one onComplete. However, the context does not guarantee which future’s onComplete will be fired first, and the andThen method guarantees the order in which the callbacks will be executed.
import scala.concurrent.ExecutionContext.Implicits.global
val intFuture = Future {
Thread.sleep(2000)
println(Thread.currentThread().getName)
200
}
// The main program's onComplete method is called in a different order
intFuture onComplete {
case Success(int) => println(s"this future returned $int")
case _ => println("something wrong has happened.")
}
intFuture onComplete {
case Success(int) => println(s"completed with the value of $int")
case _ => println("something wrong has happened.")}Thread.sleep(3000)
Copy the code
It is possible for the console to print either this Future returned $int or completed with the value of $int.
import scala.concurrent.ExecutionContext.Implicits.global
val intFuture = Future {
Thread.sleep(2000)
println(Thread.currentThread().getName)
200
}
intFuture onComplete {
case Success(int) => println(s"this future returned $int")
case _ => println("something wrong has happened.")
}
intFuture andThen {
case Success(int) => println(s"completed with the value of $int")
case _ => println("something wrong has happened.")}Thread.sleep(3000)
Copy the code
The andThen method returns a mirror of the original Future and executes only after the future has called the onCompelete method.
Promise
When we are not sure when the future will be complete, we can use a Promise to make a “Promise”. It means that at some point in the future, there will be a value.
val promisedInt: Promise[Int] = Promise[Int]
Copy the code
However, the calculation of the Int value is actually delegated to another Future. The entrusted Future “fulfills” the promise by calling the promise’s success method after calculating the result.
val intFuture = Future {
println("Working on...")
println("The thread that performs this computation is:" + Thread.currentThread().getName)
Thread.sleep(1000)
// Once this is done, the promise is bound to the current future.
promisedInt.success(300)}Copy the code
In addition to the SUCCESS method, promises also provide failure, Complete, and other methods to allow for exceptions. No matter which method is called, a Promise can only be used once.
promisedInt.success(300)
// Promisedint. failure(new Exception(" possible error "))
// promisedInt.complete(Success(1))
Copy the code
The promise’s future then enters the ready state, and we “cash” its return value using the onComplete callback we just described.
promisedInt.future onComplete {
case Success(value) => println(value)
case _ => println("There was an unexpected error.")}Copy the code
PromisedInt acts as a proxy here. The caller to the program may not care which future will compute and supply the value it promises to provide: it may be intFuture or IntFuture2. Therefore, we only need to set the callback function for the proxy (Promisedint. future), not other Futures. For ease of understanding, here is a coherent code listing:
import scala.concurrent.ExecutionContext.Implicits.global
val promisedInt: Promise[Int] = Promise[Int]
val intFuture = Future {
println("Working on...")
println("The thread that performs this computation is:" + Thread.currentThread().getName)
Thread.sleep(1000)
// promisedInt is actually implemented by intFuture.
promisedInt.success(300)
promisedInt.failure(new Exception("Possible error."))
promisedInt.complete(Success(1))}// If the main thread does not block for a while, then the program will terminate prematurely.
Thread.sleep(3000)
// The main function only cares about whether the promisedInt can provide a value.
promisedInt.future onComplete {
case Success(value) => println(value)
case _ => println("There was an unexpected error.")}Copy the code
Filter the return value of the Future
Scala provides two ways for you to check or filter the return value of a Future. The filter method checks the results of a Future. If the value is valid, it is reserved. The following example uses filter to ensure that the return value is a value that satisfies >=30. Notice that after executing the filter method, you get another Future value.
import scala.concurrent.ExecutionContext.Implicits.global
val eventualInt = Future {
Thread.sleep(3000)
print(s"${Thread.currentThread().getName} : return result.")
12
}
// Check whether the return value is >= 30.
val checkRes: Future[Int] = eventualInt filter(_ >= 30)
// Block wait
while(! checkRes.isCompleted){Thread.sleep(1000)
println("waiting..")}// Register the callback.
checkRes onComplete {
case Success(res) =>
println(s"result : $res")
case Failure(cause) =>
println(s"failed because of $cause")}Copy the code
If you do not meet the requirements of matching, it will return a Java. Util. NoSuchElementException: Future. Filter predicate is not satisfied. You can capture it in case Failure(casue) =>.
The Future’s collect method allows you to perform intermediate transformations on results using bias functions, which can be abbreviated using case statements.
Thread.sleep(3000)
print(s"${Thread.currentThread().getName} : return result.")
22
}
// Check whether the return value is >= 30. Take a different policy.
val transformRes: Future[Int] = eventualInt collect {
case res : Int if res > 30 => res + 30
case res : Int if res > 20 => res + 20
}
while(! transformRes.isCompleted){Thread.sleep(1000)
println("waiting...")
}
transformRes onComplete {
case Success(int) => println(s"value of $int")
case Failure(cause) => println(s"failed because of $cause")}Copy the code
Handle the expectation of failure
Failed method
Scala provides several ways to handle failed futures, including failed, fallbackTo, Recover, and recoverWith. For example, if a Future fails during execution, the failed method returns a successful Future[Throwable] instance.
import scala.concurrent.ExecutionContext.Implicits.global
val intFuture = Future {10 / 0}
intFuture onComplete {
case Success(int) => println(int)
case Failure(cause) => println(s"failed because of $cause")}val eventualThrowable: Future[Throwable] = intFuture failed
//Some(Success(java.lang.ArithmeticException: /by zero))
println(eventualThrowable.value)
Copy the code
If the Future executes normally, the failed method throws a NoSuchElement instead.
FallbackTo method
The fallbackTo method provides an insurance mechanism that allows another Future2 to be run if the original Future fails.
val intFuture = Future {10 / 0}
intFuture onComplete {
case Success(int) => println("intFuture" + int)
case Failure(cause) => println(s"failed because of $cause")}val maybeFailed: Future[Int] = intFuture fallbackTo Future {100}
maybeFailed onComplete {
case Success(int) => println("maybeFailed" + int)
case _ => println("This future's throwable will be ignored.")}Thread.sleep(2000)
println(maybeFailed.value)
Copy the code
MaybeFailed will always run (I tested it myself) whether intFuture executes successfully or not, so don’t set up code here with side effects. When intFuture runs successfully, the return value of maybeFailed is ignored; it actually returns the return value of intFuture. The return value from the maybeFailed method is valid only if intFuture fails.
If maybeFailed also executes with an exception, the exception it throws will be ignored and the original exception of the intFuture above will be caught.
Recover method
The second method, recover, has similar logic to the fallbackTo method: if it catches an exception, it allows you to take a policy based on the type of exception and return a value. But if the original Future calling it executes successfully, this alternate value is also ignored. Similarly, if there is no handling of the specified exception in the partial function passed in to recover, the original Future’s exception will be transparently passed.
import scala.concurrent.ExecutionContext.Implicits.global
val intFuture = Future {
10 / 0
}
val eventualInt: Future[Int] = intFuture recover {
case ex: ArithmeticException= >100
case ex: Exception= >200
}
intFuture onComplete {
case Success(int) => println(int)
case Failure(cause) => println(s"failed because of $cause")}Thread.sleep(3000)
println(eventualInt.value)
Copy the code
Transform : Die or Live
The Future’s Transform method receives two functions to Transform the Future, which are used to handle success and failure, respectively. Note that before Scala version 2.12, Transform required passing two functions, so I didn’t use case statements in the form of partial functions here.
The Transform method returns a separate future that is completely unrelated to the original future. So the result of the original Future will not override the value of the new future.
import scala.concurrent.ExecutionContext.Implicits.global
val intFuture = Future {
10 / 0
}
val eventualInt: Future[Int] = intFuture transform(
result => {
println("this is a valid result.")
10 * result
}, ex => {
println("some wrong has happened.")
ex.getCause
}
)
eventualInt onComplete {
case Success(result) =>
println(s"final result is : $result")
case Failure(cause) =>
println(s"see cause : $cause")}Thread.sleep(2000)
Copy the code
Combination of the Future
Zip method
The zip method combines the results of two Futures into a binary group (x,y) and returns it. However, if any Future fails, it will throw an exception. If both futures fail, an exception is thrown for the previous future, or the one that called the zip method.
val futureX = Future {
Thread.sleep(2000)
200
}
val futureY = Future {
Thread.sleep(3000)
300
}
// Make the 200 and 300 zippers into one (200,300).
val zipFuture: Future[(Int.Int)] = futureX zip futureY
zipFuture onComplete {
case Success(res) =>
println(res)
case Failure(cause) =>
println(cause)
}
Thread.sleep(4000)
Copy the code
A fold method
The Future companion object provides a fold method that accumulates the evaluated values of multiple Futures. Similarly, if any future fails, the final future will fail as well. If multiple Futures fail, the fold method only throws the exception that occurs on the first one.
import scala.concurrent.ExecutionContext.Implicits.global
val futureX = Future {
println(Thread.currentThread().getName)
Thread.sleep(1000)
200
}
val futureY = Future {
println(Thread.currentThread().getName)
Thread.sleep(1500)
300
}
// Load multiple Futures into a collection. Each future is assigned to a different worker thread for processing.
val eventualInts = List(futureX,futureY)
// Corylized function. The fold method accumulates from 0 and performs the accumulative operation.
val finalResult: Future[Int] = Future.fold(eventualInts)(0) {
(int1, int2) => {
int1 + int2
}
}
Thread.sleep(2000)
finalResult onComplete {
case Success(sum) => println(sum)
case _ => println("failed.")}Copy the code
Dependencies and uses covered in this chapter
Here are the dependencies related to Future in this chapter:
import scala.concurrent.ExecutionContext.Implicits.global
// => The execution context on which the Scala Future depends.
import scala.concurrent.duration._
// => Time representation for Await.
Copy the code
The resources
- [CSDN] Scala: Understanding and using Future
- [IBM] Scala: Asynchronous event handling in Scala
- Shortest path and critical path in discrete mathematics
- Read async/await in Scala
- [CSDN] Scala Novice Guide – Part 9 (Promise and Future Practices)