This chapter, the preface

This article is an extension of the Kotlin coroutines series. Those interested in Kotlin coroutines can read the following links.

KotlinCoroutine basic and principle series

  • Basic usage of Kotlin coroutines
  • Kotlin coroutine introduction to Android version in detail (ii) -> Kotlin coroutine key knowledge points preliminary explanation
  • Kotlin coroutine exception handling
  • Use Kotlin coroutine to develop Android applications
  • Network request encapsulation for Kotlin coroutines
  • Kotlin coroutine theory (1)
  • Kotlin coroutine theory (2)
  • [Kotlin coroutine introduction for Android version in detail (8) -> In-depth Kotlin coroutine principle (3)]
  • [Kotlin coroutine introduction for Android (9) -> In-depth Kotlin coroutine principle (4)]

FlowA series of

  • Flow Usage of Kotlin Coroutines (1)
  • Flow Usage of Kotlin coroutine (II)
  • Flow Usage of Kotlin coroutine (III)

Extend the series

  • Encapsulating DataBinding saves you thousands of lines of code
  • A ViewModel wrapper for everyday use

The author is just an ordinary developer, the design is not necessarily reasonable, we can absorb the essence of the article, to dross.

kotlinThe coroutinesFlowUse (2)

In the previous chapter we had a basic understanding of Flow. A Flow is an asynchronous data Flow that emits values in sequence and completes normally or abnormally. Common operators such as map, filter, take, and zip are also used.

Direct use ofFlowThe limitations of

However, one problem is that although Flow can convert any object into a stream for collection and calculation. But if we use Flow directly, the collection of a Flow is the value we know we need to calculate, and it is destroyed immediately after each collection. We also cannot emit new values into the stream for subsequent use.

Here’s a simple example, which we’ll explain in more detail later on. Such as:

fun test(a){
    runBlocking {
        var flow1 =  (1.3.).asFlow()
        flow1.collect { value ->
            println("$TAG: collect :${value}")
        }
        flow1 = (4.6.).asFlow()
    }
}
Copy the code
carman: collect :1
carman: collect :2
carman: collect :3
Copy the code

After we use collect to collect flow flow1, even if we re-assign flow1 later (4.. 6), we can not collect (4.. 6), we must use collect again to collect streams, such as:

fun test(a){
    runBlocking {
        var flow1 =  (1.3.).asFlow()
        flow1.collect { value ->
            println("$TAGCollect:${value}")
        }
        flow1 = (4.6.).asFlow()
        flow1.collect { value ->
            println("$TAGCollect:${value}")}}}Copy the code
Collect:1Collect:2Collect:3Collect:4Collect:5Collect:6
Copy the code

Only then can we collect new values from the FLOW1 flow. However, such operation is very troublesome. We not only need to assign flow1 again, but also need to use Collect flow again after each assignment.

We know from the previous chapter that Flow is a cold data Flow, so to realize the above requirements, I need to use hot data Flow. At this point we need to use the further implementation of Flow StateFlow and SharedFlow. But before we get to them, we need to know about another kotlin concept, Channel, because StateFlow and SharedFlow will cover channels later on.

ChannelBasic knowledge of

A Channel is a non-blocking dialogue between original senders. Conceptually, a Channel is similar to Java’s BlockingQueue, but it is suspended rather than blocked and can be closed by close. A Channel is also a hot data stream.

A process of dialogue and communication must exist on both sides. Let’s look at the definition of Channel:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) - >Unit)? = null
): Channel<E>{
     / /...
}

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
    / /...
}
Copy the code

The Channel implementation inherits a sender, SendChannel, and a receiver, ReceiveChannel, through which it communicates.

  • capacityIs the capacity of the entire channel.
  • onBufferOverflowOperation to handle buffer overflow, created by default.
  • onUndeliveredElementCalled when the element is sent but not received to the consumer.

Let’s move on to the SendChannel implementation:

public interface SendChannel<in E> {
      @ExperimentalCoroutinesApi
      public val isClosedForSend: Boolean

      public suspend fun send(element: E)

      public val onSend: SelectClause2<E, SendChannel<E>>

      public fun trySend(element: E): ChannelResult<Unit>

      public fun close(cause: Throwable? = null): Boolean

      @ExperimentalCoroutinesApi
      public fun invokeOnClose(handler: (cause: Throwable?). ->Unit)
      
      / /...
}
Copy the code

As a sender, there must be a send and a close function. TrySend is a synchronous variant of Send, which immediately adds the specified element to the channel if it does not violate its capacity limit and returns a success result. Otherwise, a failure or shutdown result is returned.

public interface ReceiveChannel<out E> {
      @ExperimentalCoroutinesApi
      public val isClosedForReceive: Boolean

      @ExperimentalCoroutinesApi
      public val isEmpty: Boolean

      public suspend fun receive(a): E

      public val onReceive: SelectClause1<E>

      public suspend fun receiveCatching(a): ChannelResult<E>

      public val onReceiveCatching: SelectClause1<ChannelResult<E>>

      public fun tryReceive(a): ChannelResult<E>

      public operator fun iterator(a): ChannelIterator<E>

      public fun cancel(cause: CancellationException? = null)
      / /...
}
Copy the code

TryReceive is similar to trySend in that if the channel is not empty, elements are retrieved and deleted from the channel, returning success, failure if the channel is empty, and closing if the channel is closed.

Let’s look at an example:

fun test(a) {
   runBlocking {
       val channel = Channel<Int>()
       launch {
           for (x in 1.. 5) channel.send(x)
       }
       launch {
           delay(1000)
           channel.send(6666)
           channel.send(9999)
       }
       repeat(Int.MAX_VALUE) {
           println("receive :${channel.receive()}")
       }
       println("done")}}Copy the code
receive :1
receive :2
receive :3
receive :4
receive :5
receive :6666
receive :9999
Copy the code

Channel A Channel provides a way to transfer values in a stream. So that we can delay the transmission of a single value between multiple coroutines. You can see that when we use a Channel, sending and receiving run different coroutines. It will also be received when we make the channel send data again.

But there is a problem here, the last done is not output, indicating that our entire parent coroutine has not finished executing. This is because we use while (true) to loop forever. So let’s make a note of that, and we’ll deal with that later.

Moving on, if we close a Channel at the end of the first launch:

fun test(a) {
   runBlocking {
       val channel = Channel<Int>()
       launch {
           for (x in 1.. 5) channel.send(x)
           channel.close()
       }
       launch {
           delay(1000)
           channel.send(6666)
           channel.send(9999)}while (true) {
           println("receive :${channel.receive()}")
       }
       println("done")}}Copy the code
receive :1
receive :2
receive :3
receive :4
receive :5

Channel was closed
kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
Copy the code

At this point we can see that the Channel has been closed, and because the Channel has been closed, we continue to call the receive function causing the coroutine to end abnormally. Similarly, calling send after a Channel has been closed will also trigger an exception. Use the isClosedForSend attribute of a Channel.

fun test(a) {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            if(! channel.isClosedForSend) {for (x in 1.. 5) channel.send(x)
                channel.close()
            }
        }
        launch {
            delay(1000)
            if(! channel.isClosedForSend) { channel.send(6666)
                channel.send(9999)}}while (true) {
            if(! channel.isClosedForSend) { println("receive :${channel.receive()}")}else{
                break
            }
        }
        println("done")}}Copy the code
receive :1
receive :2
receive :3
receive :4
receive :5
done
Copy the code

As you can see, we control send and receive by using isClosedForSend to determine whether a channel is closed, and we also break out of the while (true) loop to complete the entire coroutine execution when isClosedForSend is true.

With the simple use above, we can see that this is part of the producer-consumer pattern, and we often see it in concurrent code. We can think of SendChannel as the producer and ReceiveChannel as the consumer. This can abstract the producer into a function and have channels as its parameters, but this contradicts the common sense that you must return the result from the function.

useproducecreateChannel

This is where we need to use Produce’s handy Coroutine builder, which can easily work correctly on the producer side, and we use the extension function consumeEach to replace the loop on the consumer side. Such as:

fun test(a) {
    runBlocking {
        val squares = produceTest()
        squares.consumeEach { println("The receive:$it") }
        println("Done!")}}private fun CoroutineScope.produceTest(a): ReceiveChannel<Int> = produce {
    for (x in 1.. 5) send(x)
}
Copy the code
The receive:1The receive:2The receive:3The receive:4The receive:5
Done!
Copy the code

You can see how easy it is to create a similar case with Produce, but how is it produced? Let’s look at the source code implementation of Produce:

public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0.@BuilderInference block: suspend ProducerScope<E>. () - >Unit
): ReceiveChannel<E> =
    produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)

internal fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null.@BuilderInference block: suspend ProducerScope<E>. () - >Unit
): ReceiveChannel<E> {
    val channel = Channel<E>(capacity, onBufferOverflow)
    val newContext = newCoroutineContext(context)
    val coroutine = ProducerCoroutine(newContext, channel)
    if(onCompletion ! =null) coroutine.invokeOnCompletion(handler = onCompletion)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

You can see that Produce is an extension of CoroutineScope. Create it in a manner similar to coroutine launch. A ReceiveChannel object is created. However, it adds capacity and onBufferOverflow and onCompletion.

And how did he send the data out. The third argument, block, is the ProducerScope extension, which is different from the launch function.

public interface ProducerScope<in E> : CoroutineScope.SendChannel<E> {
  
    public val channel: SendChannel<E>
}
Copy the code

ProducerScope inherits from CoroutineScope and SendChannel. This further explains why you can send data in the Produce function.

ChannelFan out and fan in

Since it is a producer-consumer relationship, it must be purely one-to-one, one-to-many, many-to-one relationship. One on one we’ve done that. So one to many, many to one how do we achieve that.

ChannelThe fan out of

A one-to-many relationship is just what we pay lip service to. Officially, it’s called a fan out. Multiple coroutines receive data from the same pipe and work distributed among them. Such as:

fun test(a) {
    runBlocking {
        val channel = produceTest()
        repeat(5) { id ->
            launch {
                channel.consumeEach {
                    println("launch #$id received $it")
                }
            }
        }
        delay(950)
        channel.cancel()
        println("done")}}private fun CoroutineScope.produceTest(a): ReceiveChannel<Int> = produce {
    var x = 1
    while (true) {
        send(x++)
        delay(100)}}Copy the code

In Produce, we send data through a while loop, and then use repeat to launch 5 coroutines to receive them at the same time, which is cancelled by the cancel function of the producer object after delay(950). We will see output similar to the one shown below, although the id of the coroutine that receives each particular integer may be different, but they will all receive normally.

launch #0 received 1
launch #0 received 2
launch #1 received 3
launch #2 received 4
launch #3 received 5
launch #4 received 6
launch #0 received 7
launch #1 received 8
launch #2 received 9
done
Copy the code

Note that cancelling the producer coroutine closes its channel, which ultimately terminates the iteration on the channel that the coroutine is performing.

ChannelThe fan in

The same many-to-one approach is officially called a fan in. Multiple coroutines can send data to the same channel. Such as:

fun test(a) {
    runBlocking {
        val channel = Channel<String>()
        launch { sendString(channel, "a".100L) }
        launch { sendString(channel, "b".200L) }
        launch{
            channel.consumeEach {
                println(" received $it")
            }
        }
        delay(500)
        coroutineContext.cancelChildren()
        println("done")}}private suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
Copy the code

We create a string channel that sends data through a while loop. Then we launch2 subcoroutines to send data to the same channel and launch a subcoroutine to receive data. Close all subcoroutines after 500 milliseconds. We’ll see something like the following

 received a
 received b
 received a
 received a
 received b
 received a
done
Copy the code

The important thing to note here is that if we do not collect consumeEach data in the subcoroutine, the subsequent code will not execute, such as:

fun test(a) {
    runBlocking {
        val channel = Channel<String>()
        launch { sendString(channel, "a".100L) }
        launch { sendString(channel, "b".200L) }
        channel.consumeEach {
            println(" received $it")
        }
        delay(500)
        coroutineContext.cancelChildren()
        println("done")}}Copy the code

Then the coroutine will not complete and the program will always be running in the consumeEach code because I sent it in an infinite loop in sendString.

ChannelConfiguration of channel buffering

The channels shown so far have no buffers. Unbuffered channels transfer elements when the sender and receiver meet. If send is called first, it is suspended until receive is called, if receive is called first, it is suspended until send is called.

Channel and produce both have an optional parameter capacity to specify the buffer size. Buffers allow senders to send multiple elements before they are suspended. Just as BlockingQueue has a specified capacity, blocking can occur when the buffer is full, for example:

fun test(a) {
    runBlocking {
        val channel = Channel<Int> (4)
        val sender = launch {
            repeat(10) {
                println("Sending $it")
                channel.send(it)
            }
        }
        delay(1000)
        sender.cancel()
        println("done")}}Copy the code

We set the capacit size of the Channel Channel to 4 because we only sent but did not receive. Then we see the following result:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
done
Copy the code

The first four elements are added to the buffer, and the sender is suspended when trying to send the fifth element.

Fairness of access

The Channel send and receive operations are fair and respect the multiple coroutines calling them. They follow a first-in, first-out principle, such as:

fun test(a) {
    runBlocking {
        val arr = arrayOf("a"."b"."c")
        val channel = Channel<Int>()
        arr.forEach { str ->
            launch {
                while (true) {val msg = channel.receive()
                    println("$str $msg")
                    delay(100)
                    channel.send(msg)
                }
            }
        }
        channel.send(1)
        delay(1000) 
        coroutineContext.cancelChildren() 
    }
}
Copy the code
a 1
b 1
c 1
a 1
b 1
c 1
a 1
b 1
c 1
a 1
Copy the code

Be aware, however, that sometimes the channel execution does not look fair, depending on the scheduler of the coroutine. This is the basic knowledge of the coroutine, and I’m not going to do it here.

Timer channel

A timer channel is a special channel from which a Unit is consumed every time a specific delay passes. Although it may seem useless, it is used to build segmentation to create complex time-based Produce channels and perform windowing operations and other time-dependent processing.

public fun ticker(
    delayMillis: Long,
    initialDelayMillis: Long = delayMillis,
    context: CoroutineContext = EmptyCoroutineContext,
    mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit> {
    require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
    require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
    return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) {
        when (mode) {
            TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel)
            TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel)
        }
    }
}
Copy the code

Create a timer channel by ticker function, you can set the interval delayMillis, initial start delay initialDelayMillis. Such as:

fun test(a) {
    runBlocking{
        val ticker = ticker(300.0)
        var count = 0
        for (event in ticker) {
            count++
            if (count == 4) {
                break
            } else {
                println(count)
            }
        }
        ticker.cancel()
    }
}
Copy the code
1
2
3
done
Copy the code

This is the end of the channel. Although it doesn’t look like there are as many converters as there are for flow, it provides a programming idea and a foundation for using StateFlow and SharedFlow later on.

StateFlowandShareFlowThe use of

Why use itStateFlowandShareFlow

Flow is a convenient SET of apis, but it does not provide the state management necessary for some scenarios. The limitations of Flow mentioned above are for this reason.

For example, a process may have multiple intermediate states and one termination state, especially the common file download example. Such as:

Ready -> Start -> Downloading -> Success/failure -> Done

We want changes in the state to inform the observer of the action. Although I can be done by ChannelConflatedBroadcastChannel channel, but the implementation is a bit too complicated. In addition, there are some logical inconsistencies when using channels for state management. For example, channels can be closed or cancelled. However, because the state cannot be cancelled, it cannot be used in state management.

We need to use StateFlow and SharedFlow instead of channels. StateFlow and ShareFlow are also part of the Flow API and allow data flows to optimally issue status updates and emit values to multiple users.

StateFlowThe use of

StateFlow is a state-container-like stream of observable data that issues current state updates and new state updates to its collector, with any updates to values fed back to the receivers of all flows. The current state value can also be read from its value property.

StateFlow can completely replace the ConflatedBroadcastChannel. StateFlow than ConflatedBroadcastChannel simpler and more efficient. It also has a better distinction between MutableStateFlow and StateFlow between variability and immutability.

StateFlow comes in two types: StateFlow and MutableStateFlow. The class responsible for updating MutableStateFlow is the provider, and all classes collected from StateFlow are users. Unlike the cold data flow built with the Flow builder, StateFlow is a hot data flow.

public interface StateFlow<out T> : SharedFlow<T> {
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T

    public fun compareAndSet(expect: T, update: T): Boolean
}
Copy the code

Collecting data from such a data flow does not trigger any provider code. StateFlow is always active and exists in memory, and is eligible for garbage collection only if no other references to it are involved in the garbage collection root. When a new user starts collecting data from the data flow, it receives the most recent state in the information flow and any subsequent states. This change in LiveData is similar.

We can see that StateFlow inherits from SharedFlow, and we can think of StateFlow as a better concrete implementation of SharedFlow. Therefore, for the convenience of explanation, the author chooses to start with StateFlow. Now let’s implement the above process example as follows:

class TestActivity : AppCompatActivity() {
    private val viewModel: TestFlowViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.test)
        lifecycleScope.launch {
            viewModel.state.collect {
                Log.d("carman"."state : $it")
            }
        }
        viewModel.download()
    }
}

class TestFlowViewModel : ViewModel() {
    private val _state: MutableStateFlow<Int> = MutableStateFlow(0)
    val state: StateFlow<Int> get() = _state

    fun download(a) {
        for (state in 0.. 5) {
            viewModelScope.launch(Dispatchers.IO) {
                delay(200L * state)
                _state.value = state
            }
        }
    }
}
Copy the code
D/carman: state : 0
D/carman: state : 1
D/carman: state : 2
D/carman: state : 3
D/carman: state : 4
D/carman: state : 5
Copy the code

You can see how easy it is to collect multi-state changes through StateFlow. The important thing to note here is that StateFlow is read-only, so if you need to modify the value, you need to use MutableStateFlow.

And one of the details is get(). Why use get() instead of straight =? Suppose we add a state2 via = assignment:

class TestFlowViewModel : ViewModel() {
    private val _state: MutableStateFlow<Int> = MutableStateFlow(0)
    val state: StateFlow<Int> get() = _state
    val state2: StateFlow<Int>  = _state
    //....
}
Copy the code

We see that the compiled Java code will look like this:

public final class TestFlowViewModel extends ViewModel {
   private final MutableStateFlow _state = StateFlowKt.MutableStateFlow(0);
   @NotNull
   private final StateFlow state2;

   @NotNull
   public final StateFlow getState() {
      return (StateFlow)this._state;
   }

   @NotNull
   public final StateFlow getState2() {
      return this.state2;
   }
   
    public TestFlowViewModel() {
       this.state2 = (StateFlow)this._state;
    }
    
    / / * *
 }
 
 / / * *
Copy the code

This is because using get() simply adds a getState function to get the return value of the specified type. Using = creates an additional variable of type StateFlow to hold an object reference to the same _state.

StateFlowData processing process

If we add a log output to collect, what will be the difference?

class TestActivity : AppCompatActivity() {
    private val viewModel: TestFlowViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.test)
        lifecycleScope.launch {
            viewModel.state.collect {
                Log.d("carman"."state : $it")
            }
            Log.d("carman"."No execution here!")
        }
        viewModel.download()
    }
}

class TestFlowViewModel : ViewModel() {
    private val _state: MutableStateFlow<Int> = MutableStateFlow(0)
    val state: StateFlow<Int> get() = _state
    fun download(a) {
        for (state in 0.. 5) {
            viewModelScope.launch(Dispatchers.IO) {
                delay(200L * state)
                _state.value = state
            }
        }
    }
}
Copy the code
D/carman: state : 0
D/carman: state : 1
D/carman: state : 2
D/carman: state : 3
D/carman: state : 4
D/carman: state : 5
Copy the code

At this point we will see that the output is the same as above, there is no log generated that will not be executed here. So what’s going on here? Collect data, the following code will not be executed. At this point we need to look at the implementation of MutableStateFlow() :

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ? : NULL)Copy the code

As you can see, MutableStateFlow creates a MutableStateFlow object through StateFlowImpl.

private class StateFlowImpl<T>(
      initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
      private val _state = atomic(initialState)
      private var sequence = 0

      @Suppress("UNCHECKED_CAST")
      public override var value: T
            get() = NULL.unbox(_state.value)
            set(value) { updateState(null, value ? : NULL) }override fun compareAndSet(expect: T, update: T): Boolean =
            updateState(expect? : NULL, update ? : NULL)private fun updateState(expectedState: Any? , newState:Any): Boolean {
            var curSequence = 0
            varcurSlots: Array<StateFlowSlot? >? =this.slots
            synchronized(this) {
                  val oldState = _state.value
                  if(expectedState ! =null&& oldState ! = expectedState)return false
                  if (oldState == newState) return true
                  _state.value = newState
                  curSequence = sequence
                  if (curSequence and 1= =0) {
                        curSequence++
                        sequence = curSequence
                  } else {
                        sequence = curSequence + 2
                        return true
                  }
                  curSlots = slots
            }

            while (true) { curSlots? .forEach { it? .makePending() } synchronized(this) {
                        if (sequence == curSequence) {
                              sequence = curSequence + 1
                              return true
                        }
                        curSequence = sequence
                        curSlots = slots
                  }
            }
      }

      override val replayCache: List<T>
            get() = listOf(value)

      override fun tryEmit(value: T): Boolean {
            this.value = value
            return true
      }

      override suspend fun emit(value: T) {
            this.value = value
      }

      @Suppress("UNCHECKED_CAST")
      override fun resetReplayCache(a) {
            throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")}override suspend fun collect(collector: FlowCollector<T>) {
            val slot = allocateSlot()
            try {
                  if (collector is SubscribedFlowCollector) collector.onSubscription()
                  val collectorJob = currentCoroutineContext()[Job]
                  var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
                  while (true) {
                        valnewState = _state.value collectorJob? .ensureActive()if (oldState == null|| oldState ! = newState) { collector.emit(NULL.unbox(newState)) oldState = newState }if(! slot.takePending()) { slot.awaitPending() } } }finally {
                  freeSlot(slot)
            }
      }

      override fun createSlot(a) = StateFlowSlot()
      override fun createSlotArray(size: Int): Array<StateFlowSlot? > = arrayOfNulls(size)override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
            fuseStateFlow(context, capacity, onBufferOverflow)
}
Copy the code

In the StateFlowImpl class, data changes are updated by comparison with _state and by atomic atomic type. Atomic operations are operations that are not interrupted by the thread scheduling mechanism, and once started, they run until they finish. There are no thread context switches in between.

If you’re interested, go see the atomic source code for yourself. The StateFlowImpl class collect function is used to collect functions.

private class StateFlowImpl<T>(
      initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
        / /...
      override suspend fun collect(collector: FlowCollector<T>) {
            val slot = allocateSlot()
            try {
                  if (collector is SubscribedFlowCollector) collector.onSubscription()
                  val collectorJob = currentCoroutineContext()[Job]
                  var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
                  while (true) {
                        valnewState = _state.value collectorJob? .ensureActive()if (oldState == null|| oldState ! = newState) { collector.emit(NULL.unbox(newState)) oldState = newState }if(! slot.takePending()) { slot.awaitPending() } } }finally {
                  freeSlot(slot)
            }
      }
    / /...
}
Copy the code

We can see that in this function an infinite loop is executed through the while, and when the data changes, new data is delivered. EnsureActive checks whether the current job is active when the execution changes. Otherwise, an exception is thrown to end the execution. Collect will normally run until awaitPending exits.

public fun Job.ensureActive(a): Unit {
    if(! isActive)throw getCancellationException()
Copy the code

That is to say: as long as we collect data from a StateFlow through collect in the same coroutine, the code after Collect will not be executed under normal circumstances. Now there is another question: will the while in collect function be continuously executed?

To understand this, we need to go back to the source implementation of Collect. The first line of collect creates a slot via allocateSlot, which is an implementation from the parent class AbstractSharedFlowSlot:

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*> > :SynchronizedObject() {
      @Suppress("UNCHECKED_CAST")
      protected varslots: Array<S? >? =null
            private set
            / /...
      @Suppress("UNCHECKED_CAST")
      protected fun allocateSlot(a): S { 
            var subscriptionCount: MutableStateFlow<Int>? = null
            val slot = synchronized(this) {
                  val slots = when (val curSlots = slots) {
                        null -> createSlotArray(2).also { slots = it }
                        else -> if (nCollectors >= curSlots.size) {
                              curSlots.copyOf(2 * curSlots.size).also { slots = it }
                        } else {
                              curSlots
                        }
                  }
                  var index = nextIndex
                  var slot: S
                  while (true) { slot = slots[index] ? : createSlot().also { slots[index] = it } index++if (index >= slots.size) index = 0
                        if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break} nextIndex = index nCollectors++ subscriptionCount = _subscriptionCount slot } subscriptionCount? .increment(1)
            return slot
      }

      @Suppress("UNCHECKED_CAST")
      protected fun freeSlot(slot: S) {
            var subscriptionCount: MutableStateFlow<Int>? = null
            val resumes = synchronized(this) {
                  nCollectors--
                  subscriptionCount = _subscriptionCount
                  if (nCollectors == 0) nextIndex = 0
                  (slot as AbstractSharedFlowSlot<Any>).freeLocked(this)}for (cont inresumes) cont? .resume(Unit) subscriptionCount? .increment(-1)}protected inline fun forEachSlotLocked(block: (S) - >Unit) {
            if (nCollectors == 0) returnslots? .forEach { slot ->if(slot ! =null) block(slot)
            }
      }
}
Copy the code

Imagine a computer’s memory card slot. As long as the main board is big enough and there are enough slots, you can add unlimited memory chips. Similarly, a StateFlow can collect data for multiple times. Each time a StateFlow collects data, a slot will be created for transmitting data. Therefore, a slot can also be called a subscriber.

Create a slot securely using synchronized in the allocateSlot function. The freeSlot function synchronously releases a slot and restores suspended coroutines. Note that subscriptionCount is a variable that counts the number of subscribers and increases or decreases with each creation and release.

Slot is a variable of StateFlowSlot type. After sending data, the state will check whether the current state is PENDING through the slot function takePending. If slot’s _state value is NONE for a long time then the coroutine is suspended via awaitPending:

private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
    private val_state = atomic<Any? > (null)
    
    / /...
    
    fun takePending(a): Boolean= _state.getAndSet(NONE)!! .let { state -> assert { state !is CancellableContinuationImpl<*> }
        return state === PENDING
    }

    @Suppress("UNCHECKED_CAST")
    suspend fun awaitPending(a): Unit = suspendCancellableCoroutine sc@ { cont ->
        assert { _state.value !is CancellableContinuationImpl<*> } 
        if (_state.compareAndSet(NONE, cont)) return@sc 
        assert { _state.value === PENDING }
        cont.resume(Unit)}}Copy the code

This might be a little confusing, but you need to be familiar with the use of atomic types.

CompareAndSet (Expect, update) examples:

val top = atomic(1)
println( _state.compareAndSet(1.2)) // true
println( _state.value) / / 2
Copy the code

GetAndSet (update) example:

val top = atomic(1)
println( _state.getAndSet(2)) / / 1
println( _state.value) / / 2
Copy the code

Using getAndSet in takePending means that the value is set after the value is evaluated. The compareAndSet in the awaitPending function is used to determine whether the expected value is equal to the actual value of the variable. If so, the new value is assigned to the variable; otherwise, failure is returned. I’ll end there.

The general process is:

  • Check whether the data needs to passemitLaunch.
  • throughgetAndSetGets the currentslotthe_stateWhether the value is equal toPENDINGAnd then theNONEAssigned toslotthe_state, and finally returns the state comparison results before the assignment.
  • iftakePendingState comparison if the currentslotValues in theNONE, it will beslotthe_stateValue changed to:CancellableContinuationImplCoroutine object ofcont. If the currentslotIn aPENDINGState resumes execution of the coroutine.

Now we can see that while in StateFlow’s Collect function is not executed continuously. Instead, they use the pending and resuming of takePending to determine whether the next while loop needs to be executed.

StateFlowData update process

Through the above learning, we can find that if we process data too slowly in the process of data collection, we may lose the intermediate data. Now let’s verify this idea:

class TestActivity : AppCompatActivity() {
    private val viewModel: TestFlowViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.test)
        lifecycleScope.launch {
            viewModel.state.collect {
                delay(2000)
                Log.d("carman"."state : $it")
            }
        }
        viewModel.download()
    }
}

class TestFlowViewModel : ViewModel() {
    private val _state: MutableStateFlow<Int> = MutableStateFlow(0)
    val state: StateFlow<Int> get() = _state

    fun download(a) {
        for (state in 0.. 5) {
            viewModelScope.launch(Dispatchers.IO) {
                delay(200L * state)
                _state.value = state
            }
        }
    }
}
Copy the code

If we add a delay in collect to simulate time consumption. Then we should see the following log:

D/carman: state : 0
D/carman: state : 5
Copy the code

It can be seen that we only received two data changes, and when the intermediate processing time-consuming process was lost, the data transmitted from the upstream were basically consistent after multiple runs. So that proves what we thought above. We can take a closer look at the implementation of the data change process in the StateFlowImpl source code.

private class StateFlowImpl<T>(
      initialState: Any
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
      private val _state = atomic(initialState)
      private var sequence = 0

      @Suppress("UNCHECKED_CAST")
      public override var value: T
            get() = NULL.unbox(_state.value)
            set(value) { updateState(null, value ? : NULL) }override fun compareAndSet(expect: T, update: T): Boolean =
            updateState(expect? : NULL, update ? : NULL)private fun updateState(expectedState: Any? , newState:Any): Boolean {
            var curSequence = 0
            varcurSlots: Array<StateFlowSlot? >? =this.slots
            synchronized(this) {
                  val oldState = _state.value
                  if(expectedState ! =null&& oldState ! = expectedState)return false
                  if (oldState == newState) return true
                  _state.value = newState
                  curSequence = sequence
                  if (curSequence and 1= =0) {
                        curSequence++
                        sequence = curSequence
                  } else {
                        sequence = curSequence + 2
                        return true
                  }
                  curSlots = slots
            }

            while (true) { curSlots? .forEach { it? .makePending() } synchronized(this) {
                        if (sequence == curSequence) {
                              sequence = curSequence + 1
                              return true
                        }
                        curSequence = sequence
                        curSlots = slots
                  }
            }
      }
      / /...
}
Copy the code

When data changes are updated by updateState, synchronized is used in updateState. Use the and bit to calculate the odd and even values of the curSequence. Here, the curSequence odd and even values are used to determine whether the data is being updated.

  • If the value of curSequence is odd, it indicates that the curSequence is not in the process of updating data, and the value of curSequence +1 indicates that the curSequence is in the process of updating data.

  • If the curSequence value is an even number, it indicates that the data is being updated. Since the data update is not completed, the value of the curSequence value is +2 to ensure that the data is still being updated and a new data needs to be updated.

Moving on, after the synchronization code has finished, there is a while loop that first sends makePending operations on all slots. At this point you need to go back to StateFlowSlot and see the implementation in action:

private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
    private val_state = atomic<Any? > (null)
    / /...

    @Suppress("UNCHECKED_CAST")
    fun makePending(a) {
        _state.loop { state ->
            when {
                state == null -> return // this slot is free - skip it
                state === PENDING -> return // already pending, nothing to do
                state === NONE -> { // mark as pending
                    if (_state.compareAndSet(state, PENDING)) return
                }
                else- > {// must be a suspend continuation state
                    // we must still use CAS here since continuation may get cancelled and free the slot at any time
                    if (_state.compareAndSet(state, NONE)) {
                        (state as CancellableContinuationImpl<Unit>).resume(Unit)
                        return
                    }
                }
            }
        }
    }
    / /...
}
Copy the code

Loop is a loop statement of atomic type, which can be summarized in three cases:

  • when_stateThe value is equal to thenullorPENDINGI don’t do anything,
  • when_stateIs equal to NONE_stateIs changed toPENDINGWait till next timewhileLoop in.
  • When enteringelseWhen I branch, that’s just 1, 2, 3CancellableContinuationImplCoroutine object ofcontWe are up therecollectThat’s what the function says, callslotThe object’sawaitPendingDelta of theta, delta of thetaslotthe_stateValues in theNONE, will assign the coroutine to_state. then_stateValue changed to:NONEAnd finally recoverCancellableContinuationImplExecution of coroutines.

Go back to the while of StateFlowImpl’s updateState function and use synchronization to determine the value of the curSequence. If you do not enter updateState again to modify the curSequence value during makePending,

If they are equal, the curSequence value +1 restores to odd, that is, the curSequence is marked as not in the process of updating data. Then close the updateState function.

If not, the new value is assigned to the curSequence under the synchronous lock, and finally everything for the next loop is re-read.

This update process once again proves that if we are too slow in the data collection process of COLLECT, then we may lose the intermediate data.

StateFlowRefuse to update

From the above we can see that, no matter in collect or updateState, every time the data changes, it will be paired with the previous old data, for example:

// Judgment in updateState
if (oldState == newState) return true 
Copy the code
//collect
 if (oldState == null|| oldState ! = newState) { collector.emit(NULL.unbox(newState)) oldState = newState }Copy the code

This results in the same data, with multiple updates only valid for the first time and then discarded, for example:

class TestActivity : AppCompatActivity() {
    private val viewModel: TestFlowViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.test)
        lifecycleScope.launch {
            viewModel.state.collect {
                Log.d("carman"."The state:$it")
            }
        }
        viewModel.download()
    }
}

class TestFlowViewModel : ViewModel() {
    private val _state: MutableStateFlow<Int> = MutableStateFlow(0)
    val state: StateFlow<Int> get() = _state
    fun download(a) {
        viewModelScope.launch(Dispatchers.IO) {
            for (index in 1.. 5) {
                delay(500)
                _state.value = 1
                Log.d("carman"."The first$indexUpdate")}}}}Copy the code
D/carman: state:0D/carman: first1Update D/carman: state:1D/carman: first2Update D/ Carman: no3Update D/ Carman: no4Update D/ Carman: no5Time to updateCopy the code

It can be seen that we sent data 1 several times. Although the delay was increased at each sending interval to ensure data transmission, it still took effect only when the data was different.

StateFlowThe SAO operation

As we know from the above learning, we can only collect the first StateFlow data in a coroutine. Suppose there is now a requirement to terminate the data collection for the first StateFlow and perform the data collection for the next StateFlow when the state with the first StateFlow reaches a critical value. So we can do this:

class TestActivity : AppCompatActivity() {
    private val viewModel: TestFlowViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?). {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.test)
        lifecycleScope.launch {
            try {
                viewModel.state.collect {
                    Log.d("carman"."state : $it")
                   if (it == 3) {throw NullPointerException("Terminate data collection for the first StateFlow")}}}catch (e: Exception) {
                Log.d("carman"."e : $e")
            }
            viewModel.name.collect {
                Log.d("carman"."name : $it")
            }
        }
        viewModel.download()
    }
}

class TestFlowViewModel : ViewModel() {
    private val _state: MutableStateFlow<Int> = MutableStateFlow(0)
    val state: StateFlow<Int> get() = _state

    private val _name: MutableStateFlow<String> = MutableStateFlow("Second StateFlow")
    val name: StateFlow<String> get() = _name
    fun download(a) {
        for (state in 0.. 5) {
            viewModelScope.launch(Dispatchers.IO) {
                delay(200L * state)
                _state.value = state
            }
        }
    }
}
Copy the code

We end the first StateFlow data collection by throwing an exception in the _state collect function by conditional judgment. At this point, our first StateFlow is ready for data collection.

D/carman: state : 0
D/carman: state : 1
D/carman: state : 2
D/carman: state : 3D/carman: e: Java. Lang. NullPointerException: termination of the first data collection D/StateFlow carman: name: second StateFlowCopy the code

At this point, the use of StateFlow is basically over. Because of the space of the word limit, really not I want to drag the draft, in order to pursue quality, I have to write good push to come again. The use of ShareFlow will be covered in the next article.

Originality is not easy. If you like this article, you can click “like”.

Technical exchange group, interested can join.

The blogger is participating in the Nuggets Creator of the Year contest, and a favorite friend is helping to vote for encouragement.

Related articles Kotlin coroutine fundamentals and Principles series

  • Basic usage of Kotlin coroutines
  • Kotlin coroutine introduction to Android version in detail (ii) -> Kotlin coroutine key knowledge points preliminary explanation
  • Kotlin coroutine exception handling
  • Use Kotlin coroutine to develop Android applications
  • Network request encapsulation for Kotlin coroutines
  • Kotlin coroutine theory (1)
  • Kotlin coroutine theory (2)
  • [Kotlin coroutine introduction for Android version in detail (8) -> In-depth Kotlin coroutine principle (3)]
  • [Kotlin coroutine introduction for Android (9) -> In-depth Kotlin coroutine principle (4)]

FlowA series of

  • Flow Usage of Kotlin Coroutines (1)
  • Flow Usage of Kotlin coroutine (II)
  • Flow Usage of Kotlin coroutine (III)

Extend the series

  • Encapsulating DataBinding saves you thousands of lines of code
  • A ViewModel wrapper for everyday use