Introduction to the

This article mainly explains the implementation principle of delay function in Kotlin, delay is a suspend function. Kotlin Ctrip use process, often use to suspend function, when I learn Kotlin Ctrip, some phenomena let me is very confused, so I intend to analyze one by one from the source point of view.

instructions

In the process of analyzing delay source code implementation, because kotlin is not very familiar with some syntax, so it will not be very thorough every step, only comb a general process, if the explanation is wrong, welcome to point out.

Example first

fun main(a) = runBlocking {
    println("${treadName()}======start")
    launch {
        println("${treadName()}======delay 1s  start")
        delay(1000)
        println("${treadName()}======delay 1s end")
    }

    println("${treadName()}======delay 3s start")
    delay(3000)
    println("${treadName()}======delay 3s end")
    // Delay to keep the process alive
    Thread.sleep(500000)}Copy the code

The output is as follows:

main======start
main======delay 3s start
main======delay 1s  start
main======delay 1s end
main======delay 3s end

Copy the code

According to the log, it can be seen that:

  1. The log output environment is in the main thread.
  2. After executing the 3s delay function, switch to **launch** Ctrip body implementation.
  3. The delay suspended functions execute their respective printing functions after resuming.

Question:

If all operations are done in one thread (the main thread), as is true for printing log output, then the problem arises. ** First: According to Java thread lore, single-thread execution is sequential and single-line. If the delay does not restart the thread, it should not be able to restart the thread. If the delay does not restart the thread, it should be able to restart the thread. ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** * 2. Delay starts a new thread, and the above phenomenon is only a thread switch, so if you call delay many times, you will have to create a lot of threads, which is a performance problem and resource problem. 3. Delay Is based on a task scheduling policy.

Delay source

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T- > >)Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
}

Copy the code

Cancellable is a CancellableContinuationImpl object, perform block (cancellable), back to the following functions.

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

Copy the code

Take a look at the get method of cont.context.delay

internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ? : DefaultDelayCopy the code

If ContinuationInterceptor is a Delay object, it returns the Delay object directly. If ContinuationInterceptor is not a Delay variable, it is a DefaultExecutor object. Inherits the EventLoopImplBase class.

RunBlocking during the execution of one line of code createCoroutineUnintercepted (receiver, completion). Intercepted () will be ContinuationInterceptor for packaging. Cont.context. delay returns the wrapped ctrip context.

Check out the scheduleResumeAfterDelay method.

    public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val timeNanos = delayToNanos(timeMillis)
        if (timeNanos < MAX_DELAY_NS) {
            val now = nanoTime()
            DelayedResumeTask(now + timeNanos, continuation).also { task ->
                continuation.disposeOnCancellation(task)
                schedule(now, task)
            }
        }
    }

Copy the code

Create a DelayedResumeTask object to perform the related scheduled tasks in also. Look at the Schedule method.

    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
            SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
            SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
            else -> error("unexpected result")}}Copy the code

SCHEDULE_OK is returned and the unpark function is executed, using Java’s LockSupport thread manipulation knowledge.

Read the thread

  val thread = thread

Copy the code
  • If delay is the context of the current trip, then how to achieve thread delay after adding the delayed task to the queue. Back to the runBlocking execution process, a line like coroutine.joinblocking () is executed.

      fun joinBlocking(a): T {
          registerTimeLoopThread()
          try{ eventLoop? .incrementUseCount()try {
                  while (true) {
                      @Suppress("DEPRECATION")
                      if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                      valparkNanos = eventLoop? .processNextEvent() ? :Long.MAX_VALUE
                      // note: process next even may loose unpark flag, so check if completed before parking
                      if (isCompleted) break
                      parkNanos(this, parkNanos)
                  }
              } finally { // paranoiaeventLoop? .decrementUseCount() } }finally { // paranoia
              unregisterTimeLoopThread()
          }
          // now return result
          val state = this.state.unboxState()
          (state as? CompletedExceptionally)? .let {throw it.cause }
          return state as T
      }
    
    Copy the code

    Perform:

     valparkNanos = eventLoop? .processNextEvent() ? :Long.MAX_VALUE
    
    Copy the code

    Look at the processNextEvent

      override fun processNextEvent(a): Long {
          // unconfined events take priority
          if (processUnconfinedEvent()) return 0
          // queue all delayed tasks that are due to be executed
          val delayed = _delayed.value
          if(delayed ! =null && !delayed.isEmpty) {
              val now = nanoTime()
              while (true) {         
                  delayed.removeFirstIf {
                      if (it.timeToExecute(now)) {
                          enqueueImpl(it)
                      } else
                          false
                  } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"}}// then process one event from queue
          val task = dequeue()
          if(task ! =null) {
              task.run()
              return 0
          }
          return nextTime
      }
    
    Copy the code

    Fetch the task from the delay queue

    val delayed = _delayed.value
    
    Copy the code

    Suspend the current thread

    parkNanos(this, parkNanos)
    
    Copy the code

    Here is a while loop, when the suspension time is up, the thread wakes up and continues to fetch the task from the task queue. If the task is still deferred, this calculates how long the thread needs to be suspended based on the current point in time, which is why multiple deferred tasks seem to be executed simultaneously.

  • If delay is DefaultExecutor like this example: The Ctrip context is not wrapped like coroutinestart.default.

    fun main(a) {
      GlobalScope.launch(start = CoroutineStart.UNDISPATCHED){
           println("${treadName()}====== I started executing ~")
           delay(1000)
            println("${treadName()}====== global Ctrip ~")
        }
        println("${treadName()}====== I want to sleep ~")
        Thread.sleep(3000)}Copy the code

    Then call the get method for Thread in DefaultExecutor:

      override val thread: Thread
          get() = _thread ? : createThreadSync()Copy the code

    Look at the createThreadSync function

      private fun createThreadSync(a): Thread {
          return_thread ? : Thread(this, THREAD_NAME).apply {
              _thread = this
              isDaemon = true
              start()
          }
      }
    
    Copy the code

    Create a call “kotlinx. Coroutines. DefaultExecutor new thread, and began to run. The run method in DefaultExecutor is executed. In the run method there is a line like this:

    parkNanos(this, parkNanos)
    
    Copy the code

    Click inside to see:

    internal inline fun parkNanos(blocker: Any, nanos: Long){ timeSource? .parkNanos(blocker, nanos) ? : LockSupport.parkNanos(blocker, nanos) }Copy the code

    Call the LockSupport.parkNanos(Blocker, Nanos) method provided by Java to block the current thread, implement suspension, and resume thread execution when the blocking time is reached.

Method to view the status of ongoing threads

fun main(a) {
    println("${treadName()}======doSuspendTwo")
    Thread.sleep(500000)}Copy the code

Run main and use the JPS command to find the corresponding Java process number (not specified, process name file name).

.3406 KotlinCoreutinesSuspendKt
...

Copy the code

Run the jstack process number to view the thread resources corresponding to the process.

The last

Because the level is limited, have wrong place unavoidable, rather misdirect others, welcome big guy to point out! Code word is not easy, thank you for your thumbs up + attention! 🙏 If you are studying with me, you can follow my official account — ❤️ Program ape Development Center ❤️. Every week, we will share the technology of Android regularly.