preface
In the previous article, we mainly explained the knowledge about asynchronous cold Flow. In this article we’ll talk about the heat flow of a Channel.
So what is a Channel?
1. Channel
1.1 know the Channel
As is shown in
A Channel is actually a concurrency-safe queue that can be used to connect coroutines and communicate between different coroutines.
In that case, let’s try a little demo:
@Test
fun `test know channel`(a) = runBlocking<Unit> {
val channel = Channel<Int> ()/ / producer
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")}}/ / consumer
val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
Copy the code
This is very simple, just two coroutines, representing producers and consumers
Let’s see how it works
receive 1 send 1 send 2 receive 2 .... Skip send 999 Receive 999Copy the code
This is easy enough to move on to the next topic!
1.2 Channel capacity
A Channel is actually a queue, and there must be a buffer in the queue, so once the buffer is full and no one calls receive and fetches send, it needs to be suspended. Deliberately slow down the receiving end and find that send always hangs and does not continue until after receive.
A lot of concepts, to try a Demo hand:
@Test
fun `test know channel2`(a) = runBlocking<Unit> {
val channel = Channel<Int> ()/ / producer
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")}}/ / consumer
val consumer = GlobalScope.launch {
while (true) {
delay(2000)
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
Copy the code
Here we see that the consumer takes more time than the producer, so
Let’s see how it works
Receive 1 Send 1 Receive 2 // Send 2 wait 2 seconds send 2 Receive 3 // Send 3 wait 2 secondsCopy the code
This runtime also validates that send needs to be suspended once the buffer is full and no one calls receive and fetches it.
In plain English, when it takes more time for the consumer to process an element than the producer to produce an element, and the cache is full, the producer will be lazy and wait for the consumer to process the buffer.
That’s easy enough to understand, I believe. Now, the next topic
1.3 the iterative Channel
A Channel itself is like a sequence, so we can get an iterator of a Channel directly when we read it.
@Test
fun `test iterate channel`(a) = runBlocking<Unit> {
val channel = Channel<Int>(Channel.UNLIMITED)
/ / producer
val producer = GlobalScope.launch {
for (x in 1.. 5) {
channel.send(x * x)
println("send ${x * x}")}}/ / consumer
val consumer = GlobalScope.launch {
/*val iterator = channel.iterator() while (iterator.hasNext()){ val element = iterator.next() println("receive $element") delay(2000) }*/
// Either way
for (element in channel) {
println("receive $element")
delay(2000)
}
}
joinAll(producer, consumer)
}
Copy the code
It’s all in the notes.
Let’s see how it works
Send 1 Send 4 Send 9 Send 16 Send 25 // The first five messages are sent almost instantaneously. Receive 1 // Each subsequent message is sent at an interval of two seconds. Receive 4 Receive 9 Receive 16 Receive 25Copy the code
As we can see, this works completely differently from 1.2! Here the producer does all the work ahead of time without waiting for the corresponding consumer processing to complete!
We have mentioned above: the conditions for producers to be “lazy” : first, consumers take longer to process than producers; Second, the cache must be full!
Val Channel = Channel
(channel.unlimited) : val Channel = Channel
(channel.unlimited)
1.4 produce and actor
- A convenient way to structure producers and consumers
- We can start a producer coroutine with the Produce method and return a ReceiveChannel. Other coroutines can use this Channel to receive data. Conversely, we can use actors to launch a consumer coroutine!
With the concept out of the way, it’s time to get started
1.4.1 use produce
@Test
fun `test fast producer channel`(a) = runBlocking<Unit> {
// the producer,
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
repeat(100) {
delay(1000)
send(it)
}
}
/ / consumer
val consumer = GlobalScope.launch {
for (i in receiveChannel) {
println("received: $i")
}
}
consumer.join()
}
Copy the code
Let’s see how it works
Received: 0 // Prints every second received: 1 Received: 2 Received: 3... slightlyCopy the code
ReceiveChannel producer coroutine is returned via GlobalScope. Produce, and the consumer can use ReceiveChannel to receive the data generated by the producer. On to the next one!
1.4.2 use actor
@Test
fun `test fast consumer channel`(a) = runBlocking<Unit> {
val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
while (true) {
val element = receive()
println(element)
}
}
val producer = GlobalScope.launch {
for (i in 0.3.) {
sendChannel.send(i)
}
}
producer.join()
}
Copy the code
Let’s see how it works
0
1
2
3
Copy the code
Here we see the corresponding consumer sendChannel generated by GlobalScope.actor, which sends data to the corresponding consumer via sendChannel.send(I) in the corresponding producer!
Move on to the next one!
1.5 Closure of Channel
-
The Channel returned by produce and actor will be closed after the execution of the corresponding coroutine, which is why the Channel is called hot data flow.
-
For a Channel, if we call its close method, it will immediately stop receiving new elements, that is, its isClosedForSend will immediately return true;
- Due to the Channel buffer, some elements may not be processed until all elements have been read
isClosedForSend
Will return true;
- Due to the Channel buffer, some elements may not be processed until all elements have been read
-
The life cycle of a Channel is best maintained by the dominant party. It is recommended that the dominant party close the Channel.
- Because there may be one producer for multiple consumers, just like a teacher giving a lecture, there are multiple students listening, the teacher is responsible for the signal of whether to leave or leave the class, not the students!
As always, when the concept is finished, start the Demo:
@Test
fun `test close channel`(a) = runBlocking<Unit> {
val channel = Channel<Int> (3)
/ / producer
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")}// The producer takes the lead in the lifecycle and executes the shutdown!
channel.close()
println("""close channel.
| - ClosedForSend: ${channel.isClosedForSend}
| - ClosedForReceive: ${channel.isClosedForReceive}"" ".trimMargin())
}
/ / consumer
val consumer = GlobalScope.launch {
for (element in channel){
println("receive $element")
delay(1000)
}
println("""After Consuming.
| - ClosedForSend: ${channel.isClosedForSend}
| - ClosedForReceive: ${channel.isClosedForReceive}"" ".trimMargin())
}
joinAll(producer, consumer)
}
Copy the code
Here we see that only the producers dominate the life cycle, the rest is state printing!
Let’s see how it works
send 0
receive 0
send 1
send 2
close channel.
- ClosedForSend: true
- ClosedForReceive: false
receive 1
receive 2
After Consuming.
- ClosedForSend: true
- ClosedForReceive: true
Copy the code
It can be seen from this running effect:
- When the producer completes execution: corresponds
ClosedForSend
To true; - When the consumer completes execution: corresponding
ClosedForReceive
To true.
1.6 BroadcastChannel
As mentioned above, producer and consumer exist one-to-many situation in Channel. In terms of data processing itself, although there are multiple receivers, the same element can only be read by one receiver. This is not the case with broadcast, where there is no mutual exclusion between multiple receivers.
Let’s see how this broadcast is used:
@Test
fun `test broadcast`(a) = runBlocking<Unit> {
//val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
val channel = Channel<Int> ()// The default cache size is used
// Initialize three consumers
val broadcastChannel = channel.broadcast(3)
val producer = GlobalScope.launch {
List(3){
delay(100)
broadcastChannel.send(it)
}
// The lifecycle is managed by the lead
broadcastChannel.close()
}
// Create three consumers
List(3){ index ->
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (i in receiveChannel){
println("[#$index] received: $i")
}
}
}.joinAll()
}
Copy the code
It’s all in the notes,
Let’s see how it works
[#0] received: 0
[#1] received: 0
[#2] received: 0
[#0] received: 1
[#1] received: 1
[#2] received: 1
[#0] received: 2
[#2] received: 2
[#1] received: 2
Copy the code
From this effect, we can see that multiple consumers can receive the same information from the same generator at the same time, and there is no mutual exclusion!
2, select-multiplexing
What is multiplexing
In a data communication system or computer network system, the bandwidth or capacity of the transmission medium is often greater than the need to transmit a single signal. In order to effectively utilize the communication line, one channel is expected to transmit multiple information simultaneously. This is called Multiplexing technology.
2.1 Multiplexing multiple await
As is shown in
The two apis fetch data from the network and the local cache, respectively, and display whichever is expected to be returned first.
2.1.1 Start actual combat
The service side
public class UserServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String user = request.getParameter("user");
if(user ! =null){
System.out.println(user);
}
System.out.println("doGet");
PrintWriter out = response.getWriter();
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("name"."jason");
jsonObject.addProperty("address"."California"); out.write(jsonObject.toString()); System.out.println(jsonObject.toString()); out.close(); }}Copy the code
The server is using the original HttpServlet+TomCat mode, without SpringBoot, the code is very simple, I will not say more.
The client
private val cachePath = "E://coroutine.cache" {"name":" HQK ","address":" chengdu "}
private val gson = Gson()
data class Response<T>(val value: T, val isLocal: Boolean)
// Load user information locally
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
delay(10000) // Intentional delay hangs for 10 seconds
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
// Load user information through the network
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
userServiceApi.getUser(name)
}
class CoroutineTest02 {
@Test
fun `test select await`(a) = runBlocking<Unit> {
GlobalScope.launch {
val localRequest = getUserFromLocal("xxx")
val remoteRequest = getUserFromRemote("yyy")
val userResponse = select<Response<User>> {
localRequest.onAwait { Response(it, true) }
remoteRequest.onAwait { Response(it, false) } } userResponse.value? .let { println(it) } }.join() } }// Define the user data class
data class User(val name: String, val address: String)
//Retrofit network data requests
val userServiceApi: UserServiceApi by lazy {
valretrofit = retrofit2.Retrofit.Builder() .client(OkHttpClient.Builder().addInterceptor { it.proceed(it.request()).apply { Log.d("hqk"."request:${code()}")
//Log.d("hqk", "boy:${body()? .string()}")
}
}.build())
.baseUrl("http://10.0.0.130:8080/kotlinstudyserver/")
.addConverterFactory(GsonConverterFactory.create())
.build()
retrofit.create(UserServiceApi::class.java)
}
interface UserServiceApi {
// Get user information
@GET("user")
suspend fun getUser(@Query("name") name: String) : User
}
Copy the code
The @test Test class calls the local and network user methods, and the select{} class calls the corresponding method onAwait, and returns the userResponse object
Let’s see how it works
User(name=jason, address=California)
Copy the code
Because the fetch from the local user is pending for 10 seconds, the network request takes less time than the local load time, so in this case, network data is being loaded.
Let’s say we comment out the local suspend 10 and run it again to see what happens:
User(name= HQK, address= HQK)Copy the code
Obviously, local data is loaded here, not network data.
When multiplexing more than one await, whichever one returns first will be displayed first
2.2 Multiplexing multiple Channels
Similar to await, receive the fastest Channel message.
@Test
fun `test select channel`(a) = runBlocking<Unit> {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[1].send(100)}val result = select<Int? > { channels.forEach { channel -> channel.onReceive { it } } } println(result) }Copy the code
Let’s take a look at the results:
100
Copy the code
Here we see that the corresponding channels are combined into a list by listOf, and then two coroutines are opened respectively, which are suspended for different times in the corresponding coroutine. Finally, we see that the channel message is received for short execution!
2.3 SelectClause
How do we know which events can be selected? All times that can be selected are of type SelectClauseN.
- SelectClause0: The event has no return value. For example, if join has no return value, onJoin is of SelectClauseN type. When used, the argument to onJoin is a no-parameter function.
- SelectClause1: The corresponding event has a return value, as is the case with onAwait and onReceive above (I won’t use this example below)
- SelectClause2: The corresponding event has a return value. In addition, an additional parameter is required. For example, channel. onSend has two parameters. The second is the callback function on success.
If we want to verify that a suspended function supports SELECT, we can simply check to see if there is a corresponding SelectClauseN callback.
Concept said a lot of actual combat to see the effect:
2.3.1 Example 1 (SelectClause0)
@Test
fun `test SelectClause0`(a) = runBlocking<Unit> {
val job1 = GlobalScope.launch {
delay(100)
println("job 1")}val job2 = GlobalScope.launch {
delay(10)
println("job 2")
}
select<Unit> {
job1.onJoin { println("job 1 onJoin") }
job2.onJoin { println("job 2 onJoin") }
}
delay(1000)}Copy the code
Let’s see how it works:
job 2
job 2 onJoin
job 1
Copy the code
This is a very standard coroutine that corresponds to an event that does not return any value, and this is the SelectClause0 type described above.
2.3.2 Example 2 (SelectClause2)
@Test
fun `test SelectClause2`(a) = runBlocking<Unit> {
val channels = listOf(Channel<Int>(), Channel<Int>())
println(channels)
launch(Dispatchers.IO) {
select<Unit? > { launch { delay(10)
channels[1].onSend(200) { sentChannel ->
println("sent 1 on $sentChannel")
}
}
launch {
delay(100)
channels[0].onSend(100) { sentChannel ->
println("sent 0 on $sentChannel")
}
}
}
}
GlobalScope.launch {
println(channels[0].receive())
}
GlobalScope.launch {
println(channels[1].receive())
}
delay(1000)}Copy the code
Let’s see how it works
[RendezvousChannel@2a084b4c{EmptyQueue}, RendezvousChannel@42b93f6b{EmptyQueue}] 200 sent 1 on RendezvousChannel@42b93f6b{EmptyQueue} // The callback succeededCopy the code
Here we see the use of Channels. OnSend. The first argument is of the corresponding type, and the second argument will call back the function, that is, the contents of the curly braces will call back the successful business logic.
2.4 Use Flow to achieve multiplexing
In most cases, we can achieve the effect of multiplexing by constructing the appropriate Flow.
private val cachePath = "E://coroutine.cache" {"name":" HQK ","address":" chengdu "}
private val gson = Gson()
data class Response<T>(val value: T, val isLocal: Boolean)
// Obtain user information locally
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
// Delay (10000) // Intentional delay
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
// Obtain user information from the network
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
userServiceApi.getUser(name)
}
class CoroutineTest02 {
@Test
fun `test select flow`(a) = runBlocking<Unit> {
// function -> coroutine -> Flow -> Flow merge
val name = "guest"
coroutineScope {
// Add the corresponding method call to the list collection by scope
listOf(::getUserFromLocal, ::getUserFromRemote)
// if you iterate over each method in the collection, function is the corresponding method
.map { function ->
function.call(name) // After calling the corresponding method, the result is passed to the next map
}.map { deferred -> // The corresponding method returns the result of deferred
flow { emit(deferred.await()) }// If you get someone, send the value through flow
}.merge() / / merger
.collect { user -> println(user) } // Just receive the flow corresponding emission value}}}Copy the code
It’s all in the notes,
Let’s see how it works
User(name= HQK, address= chengdu) User(name= Jason, address=California)Copy the code
Here we see that both local and network have been successfully received!
3. Concurrency security
3.1 Insecure concurrent access
Thread safety is always a problem when solving concurrency problems with threads, and the Implementation of Kotlin coroutines on the Java platform is not immune to concurrent scheduling, so thread safety is also worth paying attention to.
Such as:
@Test
fun `test not safe concurrent`(a) = runBlocking<Unit> {
var count = 0
List(1000) {
GlobalScope.launch { count++ }
}.joinAll()
println(count)
}
Copy the code
We can see that we have 1000 coroutines going on, and each coroutine increments count by one, which ideally should be 1000
Let’s see how that works out. Okay
973 // The value is different each time you run it againCopy the code
Now we see the real value, which is not ideal, so we need to pay attention to the concurrency!
3.2 Concurrency tools for coroutines
In addition to the tools we use to solve concurrency problems in threads, the coroutine framework also provides concurrency safety tools, including:
-
Channel: a concurrency safe message Channel that we are already familiar with
-
Mutex: a lightweight lock whose lock and unlock are semantically similar to thread locks. Mutex is lightweight because it does not block a thread until the lock is acquired, but suspends until the lock is released.
-
Semaphore: A lightweight Semaphore that can have multiple semaphores and can perform concurrent operations on the coroutine.
- when
Semaphore
When the parameter of is 1, the effect is equivalent toMutex
- when
All that said, give it a go!
3.2.1 Example 1 (using AtomicXXX)
@Test
fun `test safe concurrent`(a) = runBlocking<Unit> {
var count = AtomicInteger(0)
List(1000) {
GlobalScope.launch { count.incrementAndGet() }
}.joinAll()
println(count.get()}Copy the code
This is a generic Java solution: atomic manipulation classes
Run effect on 1000, the effect is not posted.
3.2.2 Example 2 (Using Mutex)
@Test
fun `test safe concurrent tools`(a) = runBlocking<Unit> {
var count = 0
val mutex = Mutex()
List(1000) {
GlobalScope.launch {
mutex.withLock {
count++
}
}
}.joinAll()
println(count)
}
Copy the code
We can see that the Mutex object is initialized before the coroutine starts, and the corresponding logic is locked with mutex.withlock before the corresponding coroutine adds itself.
On to the next one!
3.2.3 Example 3 (Using Semaphore)
@Test
fun `test safe concurrent tools2`(a) = runBlocking<Unit> {
var count = 0
val semaphore = Semaphore(1)
List(1000) {
GlobalScope.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()
println(count)
}
Copy the code
Here we can see that Semaphore(1) is used to obtain the corresponding object, and then the concurrent logic is solved with Semaphore. WithPermit.
3.3 Avoid accessing external mutable states
@Test
fun `test avoid access outer variable`(a) = runBlocking<Unit> {
var count = 0
val result = count + List(1000){
GlobalScope.async { 1 }
}.map { it.await() }.sum()
println(result)
}
Copy the code
Functions are written not to access external state, but to perform operations based on parameters and provide results via return values
conclusion
Well, that’s the end of this piece! I believe you should know something about Channel! In the next chapter, the comprehensive application of coroutine Flow will be explained in detail