What is the Flow
Kotlin’s Coroutines are for handling asynchronous tasks, and flows are for handling asynchronous data flows
Very simple sentence, I always feel almost meaning. Go to the Kotlin website to see the code
fun simple(a): Flow<Int> = flow { // flow builder
for (i in 1.3.) {
delay(100)
emit(i)
}
}
fun main(a) = runBlocking<Unit> {
// There is a code that loops to determine whether the main thread is blocked or not
simple().collect { value -> println(value) }
}
Copy the code
Main, runBlocking blocks the code, opens a coroutine, calls simple(), and hits collect.
So what exactly is Flow? First of all, Flow is an interface
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code
Review out. Out represents generic covariant, and only generic types can be returned as output. If the simple() method returns Flow
, then only Int can be returned
Flow is cold
The code does not run until it is collected. Collect… So let’s get rid of that
fun simple(a): Flow<Int> = flow {
println("start")
for (i in 1.3.) {
delay(100)
emit(i)
}
}
fun main(a) = runBlocking<Unit> {
simple()
}
Copy the code
Start will not be printed. The code will not run until collect.
Principle of implementation
Enter the flow method
public fun <T> flow(block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
Copy the code
The flow method, passing in a suspended FlowCollector object and returning a Flow interface of type. The instance object is SafeFlow and the block is passed in as a parameter. SafeFlow is a class.
So the simple() method just gets a SafeFlow class and does nothing else.
Take a look at simple().collect
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
Copy the code
Action is a suspend method, println(value)
In lines 2-4, Object overwrites the EMIT method with a new object and then calls the object’s Collect method, also known as the SafeFlow method.
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
Copy the code
The collector in the first line is the new object just object
In line 2 it is wrapped as SafeCollector, which also inherits from the FlowCollector class
The fourth line calls the collectSafely(..) methods
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
Copy the code
Collector.block () what is this? Look at the input parameter. The input argument is a block(), which is a method that belongs to FlowCollector
.(), so you can call the block directly. You can see this when you write the simple() method
This is the FlowCollector and the instance object is SafeCollector, so the emit is a SafeCollector emit.
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> / / 2
try {
emit(uCont, value) / / 4
} catch (e: Throwable) {
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
val previousContext = lastEmissionContext
if(previousContext ! == currentContext) { checkContext(currentContext, previousContext, value) } completion = uContreturn emitFun(collector asFlowCollector<Any? >, value,this as Continuation<Unit>)}Copy the code
The second line is thrown into the coroutine because of its name
In line 4, the emit directly calls the private emit method, and the value is passed in. Importantly, in the last line, emitFun, the first argument is passed in by the constructor, which is the simple().collect{} parentheses at the beginning. All into this emitFun method.
private valemitFun = FlowCollector<Any? >::emitasFunction3<FlowCollector<Any? >, Any? , Continuation<Unit>, Any? >Copy the code
The two colons are Kotlin’s reflection, and opening his bytecode reveals the familiar Invoke.
Summary:
Flow is cold because the EMIT method block is executed only when collect is executed
The implementation uses reflection to throw the emit value into the collect method
Flow operator
Transform, you can transform an int to some other type like a String and send it out
(1.4.).asFlow().transform {
emit("this ${it}")
}.collect {
println(it)
}
Copy the code
map
You turn one event into another event through a function. Here we turn an int into a String through the flow function
(1.. 5).asFlow().map{
flow {
emit("$it: Second")
}
}.collect {
it.collect { println(it) }
}
Copy the code
FlowOn, flow is executed in the I/O thread
(1.4.).asFlow()
.flowOn(Dispatchers.IO)
.collect {
println("take ${it}")}Copy the code
Combine combines the two emit, the latest emit
Zip compression, will be two flow content, one – to – one emission
Cache catch, catch error, fire again
OnStart started…
OnCompletion end…
There are a lot of operators out there. No more examples.
SharedFlow Shared flow
It’s an interface in itself. Flow is an interface. Inheritance relationship StateFlow: SharedFlow: Flow
The code is collected before collect. That is, before collect, the emit sent will be collected according to the configuration and policy.
val mSharedFlow = MutableSharedFlow<String>(
0.// The amount that needs to be passed back to the new collect when the new collect is collected
0.// How many more data buffers can be cached by the Flow with replay
onBufferOverflow = BufferOverflow.SUSPEND // Overflow policy
)
Copy the code
The first parameter: indicates the number of subscriptions repassed to the new observer when a new subscription is available
Second parameter: indicates how many buffers the Flow can cache if the first parameter is added
Third parameter: indicates the overflow policy. BufferOverflow has three constants: SUSPEND, DROP_OLDEST dismisses old data, and DROP_LATEST dismisses new data
How to use
fun main(a):Unit = runBlocking{
// Define a SharedFlow object
val mSharedFlow = MutableSharedFlow<String>(
4.// repass the number
10.// Buffer capacity
onBufferOverflow = BufferOverflow.SUSPEND // Overflow policy
)
launch {
mSharedFlow.collect {
println(it)
}
}
launch {
repeat(10){
mSharedFlow.emit("abc:${it}")}}}Copy the code
Objects are defined and two launches are used to start two coroutines, one for collect and the other for repeated launch.
You’ll find that the program keeps running and never ends.
Principle of implementation
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
// Check parameters
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
Copy the code
BufferCapacity0 = Number of retransmissions + buffer capacity
private class SharedFlowImpl<T>(
private val replay: Int.private val bufferCapacity: Int.private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
/* Logical structure of the buffer buffered values /-----------------------\ replayCache queued emitters / -- -- -- -- -- -- -- -- -- -- \ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- \ + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + | | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | | +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ head == minOf(minCollectorIndex, replayIndex) // by definition totalSize == bufferSize + queueSize // by definition //... * /
private varbuffer: Array<Any? >? =null
}
Copy the code
Big Kotlin has already drawn the data structure, with two definitions, the header always == the smallest index or the smallest retransmitted index. Total length == Buffer capacity + queue capacity. There is also a buffer object, and Array is an Array, specifically an Array of Java objects.
emit
override suspend fun emit(value: T) {
if (tryEmit(value)) return
emitSuspend(value)
}
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval emitted = synchronized(this) { / / 8
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false}}for (cont inresumes) cont? .resume(Unit)
return emitted
}
private fun tryEmitLocked(value: T): Boolean {
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) / / 21
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false
BufferOverflow.DROP_LATEST -> return true
BufferOverflow.DROP_OLDEST -> {}
}
}
enqueueLocked(value)
bufferSize++
if (bufferSize > bufferCapacity) dropOldestLocked()
if (replaySize > replay) {
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
Copy the code
Emit looks at key points along the way
Line 8 synchronized
Line 21, if nCollectors = = 0, tryEmitNoCollectorsLocked attempts to launch without locking the collector, if only one collector, can try to launch directly, if more than one collector, cache, buffer overrun strategy judgment
According to the first code, only one collector, perform tryEmitNoCollectorsLocked method, if multiple cache policy execution
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
if (replay == 0) return true
enqueueLocked(value)
bufferSize++
if (bufferSize > replay) dropOldestLocked()
minCollectorIndex = head + bufferSize
return true
}
private fun enqueueLocked(item: Any?). {
val curSize = totalSize
val buffer = when (val curBuffer = buffer) {
null -> growBuffer(null.0.2)
else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
}
buffer.setBufferAt(head + curSize, item)
}
private fun growBuffer(curBuffer: Array<Any? >? , curSize:Int, newSize: Int): Array<Any? > {valnewBuffer = arrayOfNulls<Any? >(newSize).also { buffer = it }if (curBuffer == null) return newBuffer
val head = head
for (i in 0 until curSize) {
newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i))
}
return newBuffer
}
Copy the code
Return if replay == 0
Otherwise go to enqueueLocked, the key growBuffer method, the buffer growth method, which basically changes the array length and moves the data into a new array. The size assigned is a power of two.
I’ve basically gone through one case of insertion up here.
Back to tryEmitLocked, if nCollectors is not 0, a caching policy is executed. Bufferoverflow.suspend returns false. Back to tryEmit (..) methods
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false}}for (cont inresumes) cont? .resume(Unit)
return emitted
}
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
emitSuspend(value)
}
Copy the code
All the way false returns to emitSuspend(value), at which point synchronized has been unlocked.
private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval emitter = synchronized(this) lock@{
if (tryEmitLocked(value)) {
cont.resume(Unit)
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// add suspended emitter to the buffer
Emitter(this, head + totalSize, value, cont).also {
enqueueLocked(it)
queueSize++
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes) } } emitter? .let { cont.disposeOnCancellation(it) }for (r inresumes) r? .resume(Unit)}Copy the code
We’re entering a coroutine. Another lock, another tryEmitLocked attempt. Success returns directly. If false still fails, an Emitter object is initialized, and the coroutine executes, bringing emmiter inside. Look inside again.
internal open class CancellableContinuationImpl<in T>(a)... {private val_state = atomic<Any? >(Active) }Copy the code
Seeing atomic, it must be a CAS operation. That is, tryEmitLocked attempts the CAS operation, suspends it, and then executes.
At this point we’re back to tryEmitLocked, and we’re basically done with the firing process.
Collect
mSharedFlow.collect { / / 1
println(it)
}
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot() / / 11
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
while (true) { / / 15
var newValue: Any?
while (true) { / / 17
newValue = tryTakeValue(slot)
if(newValue ! == NO_VALUE)breakawaitValue(slot) } collectorJob? .ensureActive() collector.emit(newValueas T)
}
} finally {
freeSlot(slot)
}
}
private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>? > = EMPTY_RESUMESval value = synchronized(this) {
val index = tryPeekLocked(slot)
if (index < 0) {
NO_VALUE
} else {
val oldIndex = slot.index
val newValue = getPeekedValueLockedAt(index)
slot.index = index + 1
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
for (resume inresumes) resume? .resume(Unit)
return value
}
Copy the code
Collect in line 1 is still an extension method and will be wrapped by FlowCollector and thrown into the concrete collect implementation in line 10.
Line 11… It won’t stick… Msharedflow. collect{} msharedflow. collect{} msharedflow. collect{}
“While (true)”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”, “collect”
TryTakeValue tries to pick up a value, and another synchronized. TryPeekLocked () gets an index, and then getPeekedValueLockedAt gets the value based on that index. After fetching the value, point to the next index.
UpdateCollectorIndexLocked this method is more complex, not continue to see, see how the name position, cache update the index. Deleting the first value in an array, for example, requires a lot of tag bits.
So let’s go back to line 17 while true, break if the new value is not NO_VALUE, otherwise awaitValue, so it’s going to look at the index, and it’s going to evaluate by the index, and it’s going to loop over the value.
The loop outside line 15 is the loop that emits the emit value. Collect FlowCollector
collect FlowCollector
collect FlowCollector
}.
A quick summary:
1. SharedFlow collect will never be completed
Emit and Collect both have synchronized, producer-consumer mode
3. If oldValue == newValue is not shaken, it will still be launched
4. You can configure replay sticky events. If the value is 0, there are no sticky events
5. Essentially an array structure
StateFlow state flow
How to use
fun main(a){
runBlocking {
val stateFlow = MutableStateFlow("aaa")
launch {
stateFlow.collect {
println("launch1:${it}")
}
}
launch {
stateFlow.emit("bbb")}}}Copy the code
To define a MutableStateFlow, we must initialize it with an initial value and always have a state. A collect, an emit
It doesn’t look too different from SharedFlow, except StateFlow can only have one data.
Principle of implementation
The StateFlow structure is not very complicated because of its inheritance, its own capabilities, and other limitations.
private class StateFlowImpl<T>(
initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialState)
public override var value: T
get() = NULL.unbox(_state.value)
set(value) { updateState(null, value ? : NULL) }override suspend fun emit(value: T) { / / 10
this.value = value
}
private fun updateState(expectedState: Any? , newState:Any: Boolean {
/ /..
synchronized(this) { / / 16
/ /..
if (oldState= =newState) return true
_state.value = newState
/ /..
}
/ /..
}
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
/ /..
while (true) {
/ /..
val newState = _state.value
/ /..collectorJob? .ensureActive()/ / 25
// Conflate value emissions using equality
if (oldState == null|| oldState ! = newState) { collector.emit(NULL.unbox(newState)) oldState = newState }/ /..}}finally { / /..}}}Copy the code
Line 10 emits, which then fires the updateState method
Line 16 is synchronized again, if oldState == newState the old value equals the new value.
Line 25 is again while true to make sure the coroutine is still active and fires the new value
A quick summary:
StateFlow Collect will never be completed
2, save is synchronized, a write, can have multiple read
If oldValue == newValue, the state will not be changed
4. Default sticky event, when a new collect, will get the latest value
It is essentially an atomic generic and must have an initial value
Other methods
SharedFlow and StateFlow inherit Flow, and you can use most of the Flow extension methods, such as Map, onStart, and onCompletion.
When flowOn() is used, you will find that only Flow is used. As a guess, Flow is actually executed by reflection, while the other two are put into coroutines. If anyone knows, ask for advice.
If you want to switch threads, you can configure Dispatchers for coroutines directly
MutableSharedFlow and MutableStateFlow can be read-only objects such as.assharedFlow () and.asStateflow () to prevent external use.
conclusion
SharedFlow and StateFlow collect will never be completed
SharedFlow is not shaken, but StateFlow is
Synchronized exists in both SharedFlow read and write, and synchronized exists in StateFlow write
SharedFlow can be configured to be sticky, while StateFlow is sticky
SharedFlow does not allow emit NULL, StateFlow does allow NULL.
Flow is not aware of the life cycle and can be solved through coroutines.
Reference documentation
Kotlin website
Do not follow suit party, LiveData, StateFlow, SharedFlow usage scenario comparison
about
If you find this article helpful, please give it a thumbs up.
If there is a mistake in the content, I will change it in time.