“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”
preface
What is a coroutine? What is a coroutine? What is a coroutine? How does a coroutine switch threads?
In addition to the above introduction CoroutineContext CoroutineScope etc., there are two important concepts in coroutines, This paper mainly makes an in-depth study of Flow(cold Flow) and Channel(hot Flow), mainly including the following contents: 1. What is Channel and the basic use of Channel 2 4. Analysis and introduction of the principles behind Flow
1. Channel
Basic use of
1.1 Channel
What is?
A Channel is actually a queue. It is a BlockQueue for data transfer between multiple coroutines. It is used for intercoroutine communication
1.2 Channel
Implement the producer-consumer model
The producer-consumer pattern in traditional Java is simple: one or more producer threads, a common blocking queue (often with ArrayBlockingQueue and LinkedBlockingQueue options), and one or more consumer threads. Producers continuously queue data into a blocking queue, and consumers recycle elements out of the queue for consumption.
So how do you implement the producer-consumer pattern with channels?
fun produceAndConsume(a) {
GlobalScope.launch {
val channel = Channel<Int> ()val producer = GlobalScope.launch {
var i = 0
while (true) {
Log.i(tag, "The producer produces:$i")
channel.send(i++)
delay(1000)}}val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
Log.i(tag, "Consumers spent:$element")
}
}
producer.join()
consumer.join()
}
}
/ / outputI/ProduceAndConsume:0I/ProduceAndConsume:0I/ProduceAndConsume:1I/ProduceAndConsume:1I/ProduceAndConsume:2I/ProduceAndConsume:2I/ProduceAndConsume:3I/ProduceAndConsume:3
Copy the code
- See to use
Channel
It’s easier to implement the producer-consumer pattern - The producer and consumer are called alternately, because the producer will hang if the cache is full after production, and the consumer will hang if the cache is empty
1.3 Buffer Capacity
We mentioned above that buffers hang when they are full, so what is the buffer capacity?
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
Copy the code
As you can see, different implementations of the rendezvous Schannel parameters will be passed as we initialize. 1. The rendezvous schannel cache has a size of 0, and will remain hanged if no one receives it after it is sent. 3.ConflatedChannel has a cache of one element size, but every time a new element comes, it replaces the old one with the new one. 4
2.Channel
The principle of analytic
2.1 send
.receive
Process analysis
As described above, the producer and consumer are called alternately. After the producer is produced, if the cache is full, it will be suspended, while the consumer will also be suspended if the cache is empty. The specific process is as follows:
- if
receive
Operation queue containsSend
Element wakes up asynchronouslysend
coroutines - if
receive
Not included in the queue package during operationSend
The element is suspendedreceive
coroutines - if
send
Operation queue containsreceive
Element wakes up asynchronouslyreceive
coroutines - Suspend the send coroutine if the queue does not contain receive at the time of the send operation
Because of the space reason is not here to post the source code, want to understand the source code of students can refer to: Kotlin coroutine Channel receive and send principle analysis
2.2 Channel
withBlockingQueue
The difference between
1. Suspend coroutines instead of blocking them. A Channel uses pending send and receive instead of blocking put and take. 2. Better performance. A Channel has better suspended performance than BlockingQueue, which involves thread blocking and waking up. A lot of thread resources are wasted in blocking. Disable. Channel can be closed at any time. When the sender receives the close instruction, it will stop sending immediately. When the element in the cache has been sent, the receiver will also close. Supports exception handling. Channel uses structured concurrency to handle exceptions, which can be implemented when one producer or consumer coroutine throws an exception and all producer and consumer coroutines are cancelled immediately. You can avoid the problem of a failure of a task in multiple threads and mistakenly think that all of them have succeeded
For details, see Kotlin Channel and producer-consumer model
2.3 Channel
How is it thread-safe?
We know that a Channel can be used to communicate between multiple coroutines, which may run on multiple threads. So Channel also has to deal with thread-safe issues. How do you ensure that?
The Channel buffer can be implemented as a linked list or an array
2.3.1 Linked list implementation
The linked list implementation stores the cache in LockFreeLinkedListHead
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
protected val queue = LockFreeLinkedListHead()
...
}
Copy the code
LockFreeLinkedListHead itself is actually a bidirectional linked list node. The so-called LockFree in Java virtual machine is actually implemented by CAS atomic operation. Its implementation principle comes from a paper lock-free and Practical 1. Key function: One-word compare-and-swap 1. Key function: One-word compare-and-swap
2.3.2 Array implementation
For the array version,ArrayChannel is simply an array inside
// If the buffer size is larger than 8, an array of size 8 will be allocated first and expanded later
private varbuffer: Array<Any? > = arrayOfNulls<Any? >(min(capacity,8))
Copy the code
The array is directly locked using a ReentrantLock. You can see that a Channel is also thread-safe by locking or CAS
2.4 Channel
Design principles
A Channel is a method of intercoroutine communication. It uses the CSP(revised sequential Processes) model. Compared with some inter-thread communication schemes, a Channel has the following characteristics:
Do not communicate by sharing memory; instead, share memory by communicating.
Do not use shared memory to communicate, but use communication to share memory.
However, in essence, the thread and coroutine information on the computer is carried out through “shared memory”, because no matter what kind of communication model, the thread or coroutine will eventually get the data from memory, and the underlying implementation of the Channel also needs to lock the shared memory
Since it’s all shared memory how is it different from using shared memory ourselves? So it would be more accurate to say “Why are we synchronizing information by sending messages instead of sharing memory directly with multiple threads or coroutines?”
- 1. First of all, using sending messages to synchronize information is a higher level of abstraction than using shared memory and mutex directly. Using higher level of abstraction can provide better encapsulation in the program design and make the logic of the program clearer.
- 2. Secondly, message sending also has some advantages over shared memory in terms of decoupling. We can divide the responsibilities of threads into producers and consumers, and decouple them by means of message passing, without relying on shared memory.
- 3. Finally, the choice to use the message sending method, by ensuring that only one active thread can access the data at a time, can naturally avoid thread contention and data conflict problems;
The above is just a summary of the CSP model, if you are interested, you can see the detailed analysis: Why use communication to share memory
3. Flow
Basic use of
Flow is a combination of Kotlin coroutines and the responsive programming model, and you’ll find that it’s very similar to RxJava, with an API for converting between the two, which is very easy to use. Flow has the following characteristics: 1. Cold data Flow, not consumed, not produced, this is the opposite of Channel: the sender of a Channel is not dependent on the receiver. 2.Flow changes the thread of data emission through flowOn, and the thread of data consumption is determined by the thread of coroutine 3. 4.Flow does not provide a cancellation method. You can cancel the Flow by canceling the coroutine in which the Flow is located
The specific use is as follows:
lifecycleScope.launch {
flow {
for (i in 1.10.) {
emit(i)
}
}.flowOn(Dispatchers.Main)
.catch {
// Exception handling
}
.onCompletion {
// Complete the callback
}
.collect { num ->
// Specific consumption processing
// Data is produced only when it is collect
// ...}}Copy the code
4.Flow
The principle of analytic
Now that we’ve introduced the basic uses and features of Flow, we can ask two questions; 1. Why is Flow a cold Flow? 2. How does Flow switch threads?
4.1 Flow
Why is it a cold flow?
Cold flow refers to the production of data when consumption starts, not consumption, not production, let’s take a look at the source first look at flow{} what happens
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
Copy the code
As you can see, what we’re doing in Flow {} is very simple. We’re basically creating a SafeFlow that inherits from AbstractFlow. Let’s take a look at what’s in AbstractFlow
public abstract class AbstractFlow<T> : Flow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 1. The collector makes a layer of packaging
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 2. Process the data receiver
collectSafely(safeCollector)
} finally {
// 3. Release coroutine related parameters
safeCollector.releaseIntercepted()
}
}
// The collectSafely method should follow the following constraints
// 1. You should not switch threads inside the collectSafely method, such as withContext(dispatchers.io)
// 2. CollectSafely is not thread safe by default
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
Copy the code
The discovery does three main things: 1. A wrapper around the data receiver FlowCollector, SafeCollector 2. Call the abstract method AbstractFlow#collectSafely inside it. 3. Release some information about the coroutine.
In conjunction with the SafeFlow we looked at earlier below, it implements the AbstractFlow#collectSafely method and calls collecte.block (), which runs the code in the flow{} block. Now it is clear why Flow is a cold Flow. This is because it only triggers sending data every time it collects
4.2 Flow
How do you switch threads
Flow
Threads are switched in a similar way as coroutines switch threads
All by starting a subcoroutine and then going throughCoroutineContext
In theDispatchers
Switch threads
Here’s the differenceFlow
Switching process usedChannel
To pass data
Because the Flow switch thread of the source code is too much, it is not described here, interested students can follow the source code, details can be seen: flowOn() how to switch coroutine
conclusion
As shown above, this article focuses onChannel
.Flow
Some questions are raised and some answers are given.
More difficult, if this article is helpful to you, welcome to like the collection ~
The resources
Why use communication to share memory to break Kotlin coroutines