Write efficient concurrent programs using coroutines
Coroutines are a feature of Kotlin because most programming languages do not have the concept of coroutines.
So what is a coroutine? It’s kind of similar to threads, and you can simply think of it as a lightweight thread. Remember, the threads we studied before are very heavyweight, and they rely on the operating system’s scheduling to switch between different threads. However, using coroutines can realize the switch between different coroutines only at the level of programming language, thus greatly improving the efficiency of concurrent programming.
For a more specific example, let’s say we have foo() and bar() :
fun foo(a) {
a()
b()
c()
}
fun bar(a) {
x()
y()
z()
}
Copy the code
In theory, x(), y(), and z() can only be executed after a(), b(), and c() have been executed. If coroutines are used, foo() is called in coroutine A, and bar() is called in coroutine B. Although they are still running in the same thread, the execution of foo() is suspended at any time and bar() is executed instead. The execution of the bar() method can also be suspended at any time and the execution of the foo() method continues, and the final output becomes uncertain.
As you can see, coroutines allow us to simulate the effects of multithreaded programming in a single-threaded mode, where the suspension and recovery of code execution are completely controlled by the programming language, independent of the operating system. This feature is a huge improvement in the efficiency of highly concurrent programs. It’s unthinkable to have 100,000 threads running. It is perfectly possible to open 100,000 coroutines, which we will verify in a moment.
Now that you understand some of the basic concepts of coroutines, let’s move on to Kotlin’s use of coroutines.
Basic use of coroutines
Kotlin does not include coroutines in the standard library API, but instead provides them as dependency libraries. So if we want to use coroutine functionality, we need to add the following dependencies to our app/build.gradle file:
implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines -- core: 1.3.8"
implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines - android: 1.3.0"
Copy the code
Create a coroutinestest.kt file, define a main() function, and start our coroutine tour.
So the first question we’re going to face is, how do you start a coroutine? The easiest way to do this is to use the global. launch function, as shown below:
fun main(a) {
GlobalScope.launch {
println("codes run in coroutine scope")}}Copy the code
The globalscope.launch function creates the scope of a coroutine so that the block of code (a Lambda expression) passed to the launch function is run in the coroutine, where we simply print a log line inside the block. So now run main(), will the log print successfully? If you try it, you’ll find no log output.
This is because the global. launch function creates a top-level coroutine each time the application finishes running. The log did not print because the application ended before the code in the code block could be run.
To solve this problem, we can simply delay the program for a while before ending, as shown below:
fun main(a) {
GlobalScope.launch {
println("codes run in coroutine scope")
}
Thread.sleep(1000)}Copy the code
Let’s block the main Thread for 1 second using thread.sleep (). Now run the program again and you will see that the log will print out, as shown in the figure below.
However, there is a problem with this method of writing, if the code block does not finish within 1 second, then it will be forced to interrupt. Look at the following code:
fun main(a) {
GlobalScope.launch {
println("codes run in coroutine scope")
delay(1500)
println("codes run in coroutine scope finished")
}
Thread.sleep(1000)}Copy the code
We add a delay() function to the block of code and print a log line after that. The delay() function allows the current coroutine to delay running for a specified amount of time, but it is different from the thread.sleep () method. The delay() function is a non-blocking suspend function that suspends the current coroutine and does not affect the execution of other coroutines. The thread.sleep () method blocks the current Thread, so all coroutines running under that Thread are blocked. Note that the delay() function can only be called in the scope of a coroutine or in other suspended functions.
Here we let the coroutine hang for 1.5 seconds, but the main thread blocks for only 1 second. What happens? Rerunning the program, you will find that the new log in the block of code is not printed because the application is over before it has time to run.
So is there any way to stop the application after all the code in the coroutine has run? Of course there is, and you can do this with the runBlocking function:
fun main(a) {
runBlocking {
println("codes run in coroutine scope")
delay(1500)
println("codes run in coroutine scope finished")}}Copy the code
The runBlocking function also creates the scope of a coroutine, but it guarantees that the current thread will be blocked until all code and subcoroutines in the scope of the coroutine have been executed. Note that the runBlocking function should normally only be used in test environments, as using it in a formal environment can cause performance problems.
Now rerun the program, and the result looks like the figure below.
As you can see, both logs will print properly.
While we can now make code run in coroutines, we don’t seem to see any particular benefit. This is because all of the code currently runs in the same coroutine, and when it comes to highly concurrent applications, coroutines have an advantage over threads.
So how do you create multiple coroutines? Quite simply, use the launch function, as shown below:
fun main(a) {
runBlocking {
launch {
println("launch1")
delay(1000)
println("launch1 finished")
}
launch {
println("launch2")
delay(1000)
println("launch2 finished")}}}Copy the code
Note that the launch function here is different from the globalScope.launch function we just used. First, it must be called within the scope of the coroutine, and second, it creates subcoroutines within the scope of the current coroutine. The characteristic of subcoroutines is that when a coroutine in an outer scope ends, all subcoroutines in that scope also end. In contrast, the globalScope.launch function always creates a top-level coroutine, which is similar to a thread because threads are not hierarchical and are always top-level.
Here we call the launch function twice, creating two subcoroutines. Rerun the program and the result is as shown in the figure below.
As you can see, the logs in the two subcoroutines are printed alternately, indicating that they are indeed running concurrently like multithreading. However, the two subcoroutines actually run in the same thread, and it is up to the programming language to decide how to schedule among the multiple coroutines, which ones to run and which ones to suspend. The scheduling process requires no operating system involvement at all, which makes coroutines surprisingly efficient at concurrency.
So how high will it be? Let’s do an experiment to find out. The code looks like this:
fun main(a) {
val start = System.currentTimeMillis()
runBlocking {
repeat(100000) {
launch {
println(".")}}}val end = System.currentTimeMillis()
println(end - start)
}
Copy the code
Here we use the repeat function to loop and create 100,000 coroutines, but we do nothing meaningful in the coroutine, just symbolically print a dot and record how long it takes to run the entire operation. Now rerun the program and you will see something like the figure below.
As you can see, it only took 469 milliseconds, which is enough to show how efficient the coroutine is. If you start 100,000 threads, you might have an OOM exception.
However, as the logic in the launch function becomes more complex, you may need to extract some of the code into a separate function. This raises a question: The code we write in launch has coroutine scope, but extracting it into a separate function doesn’t, so how do we call a suspended function like delay()?
For this purpose, Kotlin provides a suspend keyword that allows you to declare any function as a suspended function, and suspended functions can call each other, as shown below:
suspend fun printDot(a) {
println(".")
delay(1000)}Copy the code
This allows you to call delay() in the printDot() function.
However, the suspend keyword only declares a function to be suspended and does not give it coroutine scope. For example, if you try to call launch inside printDot(), it will fail because launch must be called inside the coroutine scope.
This problem can be solved by using the coroutineScope function. The coroutineScope function is also a suspended function, so it can be called in any other suspended function. It inherits the scope of an external coroutine and creates a subcoroutine. With this feature, we can provide coroutine scope for any suspended function. An example is written as follows:
suspend fun printDot(a) = coroutineScope {
launch {
println(".")
delay(1000)}}Copy the code
As you can see, we can now call the launch function within the suspended function printDot().
In addition, coroutineScope is similar to runBlocking in that it guarantees that all the code and subcoroutines in its scope will be suspended until all of them have been executed. Let’s look at the following example code:
fun main(a) {
runBlocking {
coroutineScope {
launch {
for (i in 1.10.) {
println(i)
delay(1000)
}
}
}
println("coroutineScope finished")
}
println("runBlocking finished")}Copy the code
The runBlocking function is used to create a coroutineScope, and the coroutineScope function is called to create a subcoroutine. In the scope of coroutineScope, we create a subcoroutine by calling the launch function and printing the numbers 1 through 10, one second apart, through the for loop. Finally, at the end of the runBlocking and coroutineScope functions, another log line is printed. Now rerun the program and you will see something like the figure below.
As you can see, the console prints the numbers 1 through 10 at 1-second intervals before printing the log ending with coroutineScope and finally printing the log ending with runBlocking.
CoroutineScope does suspend the external coroutine. The code behind the coroutineScope function runs only after all the code and subcoroutines in its scope have been executed.
CoroutineScope blocks the current coroutine and does not affect any other coroutine or any thread, so it does not cause any performance problems, although it seems to work a bit like runBlocking. RunBlocking is not recommended for real projects because it suspends external threads and can cause the interface to freeze if you call it in the main thread.
Ok, now that we have learned the basic use of coroutines, you have been successfully introduced. So let’s start learning more about coroutines.
More scope builders
In the previous section, we looked at the scope builders globalscope.launch, runBlocking, launch, and coroutineScope, all of which can be used to create a new coroutineScope. However, the globalscope. launch and runBlocking functions can be called anywhere, the coroutineScope function can be called in coroutineScope or suspended function, and the launch function can only be called in coroutineScope.
As mentioned earlier, runBlocking is recommended only in test environments because it blocks threads. Globalscope.launch is not recommended unless you are very specific about creating a top-level coroutine.
Why is it not recommended to use top-level coroutines? Mainly because it is too expensive to administer. For example, if we make a network request using a coroutine in an Activity, the user should cancel the network request, or at least not call back, because network requests are time consuming and the user closes the Activity before the server can respond. Since the Activity no longer exists, the callback is meaningless.
So how do you cancel a coroutine? Both the GlobalScope.launch function and the launch function return a Job object. To fetch the elimination coroutine, just call the Job object’s cancel() method, as shown below:
val job = GlobalScope.lanch {
// Handle the concrete logic
}
job.cancel()
Copy the code
But if we created a top-level coroutine every time, and then called the cancel() method of all the coroutines we created one by one when the Activity closed, wouldn’t that code be unmaintainable?
As a result, coroutine scope builders such as GlobalScope.launch are rarely used in real projects. Let me show you how to write it in real projects.
val job = Job()
val scope = CoroutineScope(job)
scope.lanch {
// Handle the concrete logic
}
job.cancel()
Copy the code
As you can see, we first create a Job object and pass it into the CoroutineScope() function. Note that CoroutineScope() is a function, although it’s named more like a class. The CoroutineScope() function returns a CoroutineScope object. This syntax structure is designed more as if we had created an instance of CoroutineScope, which Kotlin probably did intentionally. Once you have the CoroutineScope object, you can always call its launch function to create a coroutine.
All coroutines created by calling CoroutineScope’s launch function are now associated with the scope of the Job object. By calling the cancel() method once, all coroutines in the same scope can be canceled, greatly reducing the cost of coroutine management.
However, the CoroutineScope() function is more suitable for real projects, and if you are writing test learning code in the main() function, the runBlocking function is most convenient.
Coroutines are a lot of stuff, but we’re going to keep going. You already know that calling the launch function creates a new coroutine, but the launch function can only be used to execute a piece of logic, not to get the result of the execution, because its return value is always a Job object. Is there any way to create a coroutine and get the result of its execution? Of course, you can do that using async.
Async must be called within the scope of the coroutine. It creates a new subcoroutine and returns a Deferred object. To obtain the result of the async block, we need to call the await() method on the Deferred object.
fun main(a) {
runBlocking {
val result = async {
5 + 5
}.await()
println(result)
}
}
Copy the code
Here we perform a simple mathematical operation in the async function block, call the await() method to get the result of the operation, and print the result. Rerun the code and you get something like the figure below.
But the mystery of async functions doesn’t stop there. In fact, the code in the block starts executing immediately after the async function is called. When calling the await() method, if the code in the block has not finished executing, the await() method blocks the current coroutine until the result of the async function is available.
To verify this, let’s write the following code to verify:
fun main(a) {
runBlocking {
val start = System.currentTimeMillis()
val result1 = async {
delay(1000)
5 + 5
}.await()
val result2 = async {
delay(1000)
4 + 6
}.await()
println("result is ${result1 + result2}.")
val end = System.currentTimeMillis()
println("cost ${end - start} ms.")}}Copy the code
Two async functions are used in succession to perform the task, and the delay() method is called in the code block for a delay of 1 second. Following the theory that the await() method blocks the current coroutine until the code in the async block has finished executing, we record the elapsed time of the code for verification purposes. Now rerun the program, and the result looks like the figure below.
As you can see, the running time of the whole code is 2022 milliseconds, indicating that the two async functions here are indeed in a serial relationship, and the first is executed before the second is executed.
But this is obviously very inefficient, because two async functions can be executed at the same time to improve efficiency. Now modify the above code as follows:
fun main(a) {
runBlocking {
val start = System.currentTimeMillis()
val deferred1 = async {
delay(1000)
5 + 5
}
val deferred2 = async {
delay(1000)
4 + 6
}
println("result is ${deferred1.await() + deferred2.await()}.")
val end = System.currentTimeMillis()
println("cost ${end - start} ms.")}}Copy the code
Instead of using await() method to retrieve the result immediately after each async call, we now call await() method only when we need to use the result of async’s execution, so the two async functions become parallel. Rerun the program and the result is as shown in the figure below.
As you can see, the entire piece of code now takes 1021 milliseconds to run, which is an obvious improvement in performance.
Finally, we’ll look at a special scope builder: the withContext() function. The withContext() function is a suspended function. It can be thought of as a simplified version of async.
fun main(a) {
runBlocking {
val result = withContext(Dispatchers.Default) {
5 + 5
}
println(result)
}
}
Copy the code
Let me explain this code. After the withContext() function is called, the code in the code block is executed immediately, while the external coroutine is suspended. After all the code in the block has been executed, the result of the last line is returned as the return value of withContext(), so it is basically equivalent to val result = async{5 + 5}.await(). The only difference is that the withContext() function forces us to specify a thread argument, which I’m going to talk a little bit about.
As you already know, coroutines are lightweight threads, so many traditional programming tasks that require multiple threads to execute concurrently now require multiple coroutines to execute in one thread. This does not mean that we never need to start threads. For example, Android requires that network requests be made in subthreads. Even if you start a coroutine to perform the network request, if it is a coroutine in the main thread, the program will still fail. At this point we should specify a specific thread to run the coroutine using the thread argument.
There are three thread parameters: Dispatchers.Default, Dispatchers.IO, and Dispatchers. Dispatchers.Default means that a low concurrency strategy is used by Default. You can use Dispatchers when the code you are executing is computatively intensive and too much concurrency may affect the performance of the task. Dispatchers.IO means that a higher-concurrency threading strategy is used. Dispatchers.IO is used when the code you are executing is mostly blocked and waiting, such as when executing network requests, in order to support a higher amount of concurrency. Dispatchers.Main means that the child thread will not be started and the code will be executed in the Main Android thread, but this value can only be used in Android projects. Pure Kotlin programs will get an error using this type of thread parameter.
In fact, in the coroutineScope builder we just studied, all of the functions except coroutineScope can specify such a thread argument, except that withContext() is mandatory and the others are optional.
By now, you’ve covered some of the most common uses of coroutines, and learned that the main purpose of coroutines is to dramatically improve the performance of concurrent programming. But coroutines in Kotlin can actually optimize the way traditional callbacks are written to make the code cleaner, so let’s start with that.
Use coroutines to simplify the writing of callbacks
Earlier we learned about the callback mechanism of a programming language and used it to get a data response to an asynchronous network request. I don’t know if you have noticed that the callback mechanism basically relies on anonymous classes to implement, but anonymous classes are often more complicated to write, such as the following code:
HttpUtil.sendHttpRequest(address, object : HttpCallbackListener {
override fun onFinish(response: String) {
// Get the specific content returned by the server
}
override fun onError(e: Exception) {
// This is where the exception is handled}})Copy the code
Such anonymous class implementations need to be written as many times as the network requests are made in as many places. And that got us thinking, is there a simpler way to write it?
In the past, there might have been no simpler way to write it. Now, Kotlin’s coroutine makes it possible to simplify the traditional callback mechanism by simply using the suspendCoroutine function. Let’s take a look at it.
The suspendCoroutine function must be called within the coroutine scope or suspend function. It takes a Lambda expression argument, suspends the current coroutine immediately, and executes the code in the Lambda expression in an ordinary thread. A Continuation argument is passed to the argument list of the Lambda expression, and a call to its resume() method resumeWithException() allows the coroutine to resume execution.
Now that we know what suspendCoroutine does, we can use this function to optimize the traditional callback method. First, define a request() function that looks like this:
suspend fun request(address: String): String {
return suspendCoroutine { continuation ->
HttpUtil.sendHttpRequest(address, object : HttpCallbackListener {
override fun onFinish(response: String) {
continuation.resume(response)
}
override fun onError(e: Exception) {
continuation.resumeWithException(e)
}
})
}
}
Copy the code
As you can see, the request() function is a pending function and takes an address argument. Inside the request() function, we call the suspendCoroutine function we just introduced, so that the current coroutine is suspended immediately, and the code in the Lambda expression is executed in a normal thread. We then invoke the httputil.sendHttprequest () method in a Lambda expression to initiate the network request and listen for the result of the request in a traditional callback manner. If the request succeeds, the resume() method of the Continuation is called to resume the pending coroutine, passing in the data of the server response, which becomes the return value of the suspendCoroutine function. If the request fails, the Continuation’s resumeWithException() is called to resume the pending coroutine, passing in the specific reason for the exception.
You might say, isn’t this still the traditional way of writing callbacks? How does the code get simpler? This is because no matter how many network requests we make later, there is no need to repeat the callback implementation. For example, to obtain the response data of Baidu home page, you can write like this:
suspend fun getBaiduResponse(a) {
try {
val response = request("https://www.baidu.com/")
// The server responds to the data processing
} catch (e: Exception) {
// Handle the exception
}
Copy the code
So, does the code feel a lot cleaner? Because getBaiduResponse() is a suspended function, when it calls the request() function, the current coroutine is suspended immediately and waits until the network request succeeds or fails before the current coroutine resumes running. In this way, we can get the response data for an asynchronous network request without using a callback, and if the request fails, it goes directly into a catch statement.
GetBaiduResponse () is declared as a suspended function, so it can only be called in the scope of coroutines or other suspended functions. This is true because the suspendCoroutine function itself is intended to be used in conjunction with coroutines. However, with proper project architecture design, we can easily apply various coroutine code to a common project, as you will learn in the next part of the actual project.
In fact, the suspendCoroutine function can be used to simplify the writing of almost any callback, such as using Retrofit to initiate a network request.
val appService = ServiceCreator.create<AppService>()
appService.getAppData().enqueue(object : Callback<List<App>> {
override fun onResponse(call: Call<List<App>>, response: Response<List<App> >) {
// Get the data returned by the server
}
override fun onFailure(call: Call<List<App>>, t: Throwable) {
// This is where the exception is handled}})Copy the code
Do you think the way to write the callback here is also quite tedious? Don’t worry, we can simplify this a lot in no time using the suspendCoroutine function.
Since different Service interfaces return different data types, this time we can’t program for specific types as we did before, but instead use a generic approach. Define an await() function as follows:
suspend fun <T> Call<T>.await(a): T {
return suspendCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
val body = response.body()
if(body ! =null) continuation.resume(body)
else continuation.resumeWithException(
RuntimeException("response body is null"))}override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
Copy the code
This code is a little more complicated than the request() function. We declare the generic T for the await() function, and define the await() function as an extension of Call. All Retrofit network request interfaces that return a Call type can then Call the await() function directly.
Next, the suspendCoroutine function is used in the await() function to suspend the current coroutine, and because of the extension function, we now have the context for the Call object, so we can Call the enqueue() method directly and have Retrofit initiate the network request. Next, do the same for the data that the Retrofit responds to or if the network request fails. It is also important to note that in the onResponse() callback, the object resolved by calling the body() method may be empty. If it is null, the idea here is to manually throw an exception, or you can handle it more appropriately according to your own logic.
With the await() function, it becomes extremely simple to call all of Retrofit’s Service interfaces. For example, the same function can be implemented as follows:
suspend fun getAppData(a) {
try {
val appList = ServiceCreator.create<AppService>().getAppData().await()
// The server responds to the data processing
} catch (e: Exception) {
// Handle the exception}}Copy the code
Without the tedious anonymous class implementation, you can simply call the await() function to get Retrofit to initiate network requests and get the data directly from the server response. Now, of course, you might think it would be a bit cumbersome to do a try catch every time you make a network request, but we can choose not to do that here. In the absence of processing, if an exception occurs, it is thrown up one layer at a time until it is handled by a function at a certain level. Therefore, we can also make a try and catch only once in a single entry function, thus making the code much simpler.
Now that you’ve learned enough about Kotlin’s coroutines in theory, the next step is to apply it to a real Android project.