This article mainly introduces CO scheduling in COOBJC through source code analysis. What Co_lauch does seems simple once you understand this problem. Let’s start with the coroutine and Scheduler, two key data structures.
coroutine
The scheduling related fields in the coroutine data structure.
entry
: Tasks that need to be executed, ultimately pointing to blocks in co_launch(block).userdata
: an OC class object COCoroutine, which holds the coroutine data structure and corresponds to it one by one.context
: is the current context for coroutine execution.pre_context
: Saves the context in which the coroutine is suspended or needs to be restored after execution. Coobjc switches the context to realize the jump of the function.scheduler
: CO is held by the CO_queue of scheduler, which is the core of CO scheduling.
struct coroutine {
coroutine_func entry; // Process entry.
void *userdata; // Userdata.
void *context; // Coroutine,
void *pre_context; // Coroutine's source process's Call stack data.
struct coroutine_scheduler *scheduler; // The pointer to the scheduler.
...
};
typedef struct coroutine coroutine_t;
Copy the code
Coobjc operates co by yield, resume, add. These three methods are also frequently used in CO scheduling.
coroutine_yield
void coroutine_yield(coroutine_t *co)
{
if (co == NULL) {
// if null
co = coroutine_self();
}
BOOL skip = false;
coroutine_getcontext(co->context);
if (skip) {
return;
}
#pragma unused(skip)
skip = true;
co->status = COROUTINE_SUSPEND;
coroutine_setcontext(co->pre_context);
}
Copy the code
This function suspends the coroutine. Main refers to coroutine_scheduler_main, the entry function of main_coroutine in scheduler, Im refers to coroutine_resume_im(coroutine_t *co), which executes the co entry. There’s a for loop in main that loops through the co_queue to get the head and executes the head entry through im. Scheduler is explained in more detail below.
coroutine_setcontext(co->pre_context); This line allows you to jump to coroutine_getContext (co->pre_context). The usual call stack is main()->im()-> coroutine_getContext (co->pre_context). Coroutine_getcontext (co->pre_context) in im. Im will return directly to the for loop in main. Continue fetching the head of co_queue to perform the entry of the head. When the for loop reaches the suspended CO again, call coroutine_setContext (co->context) in im’s co Entry method, The program jumps to coroutine_getContext (co->context) in the coroutine_yield() method; In this row, skip is yes. Coroutine_yield () returns. Return to where coroutine_yield() was called. Yield saves the context so that the suspended CO can continue execution in the previous context.
coroutine_resume
void coroutine_resume(coroutine_t *co) {
if(! co->is_scheduler) { coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists(); co->scheduler = scheduler; scheduler_queue_push(scheduler, co);if (scheduler->running_coroutine) {
// resume a sub coroutine.
scheduler_queue_push(scheduler, scheduler->running_coroutine);
coroutine_yield(scheduler->running_coroutine);
} else{ // scheduler is idle coroutine_resume_im(co->scheduler->main_coroutine); }}}Copy the code
The coroutine_resume method pushes co to the scheduler coroutine queue. If there are currently running coroutines, the currently running coroutine will be suspended and pushed into the coroutine queue. If there are only newly added co’s and suspended CO’s in the co_queue, the head of the newly added CO’s in the queue will be fetched by the for loop of the main function to execute the entry. If no coroutines are currently running, the entry function main_corroutine in scheduler is executed. This function is a for loop that reads the CO from the queue and executes the CO entry. The CO added to the CO_quue queue will eventually be executed. If there is no runing_coroutine, the main_coroutine will be suspended, and the for loop will not execute. Coroutine_resume_im (co->scheduler->main_coroutine);
coroutine_add
void coroutine_add(coroutine_t *co) {
if(! co->is_scheduler) { coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists(); co->scheduler = scheduler;if (scheduler->main_coroutine->status == COROUTINE_DEAD) {
coroutine_close_ifdead(scheduler->main_coroutine);
coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
main_co->is_scheduler = true;
main_co->scheduler = scheduler;
scheduler->main_coroutine = main_co;
}
scheduler_queue_push(scheduler, co);
if(! scheduler->running_coroutine) { coroutine_resume_im(co->scheduler->main_coroutine); }}}Copy the code
This method adds the current CO to the Scheduler coroutine queue. If the state of the main_coroutine is dead, a main_coroutine is created. Coroutine_t * main_CO = coroutINE_CREATE (coroutINE_scheduler_main); The main_coroutine entry points to the coroutine_scheduler_main function, which we’ll talk about later, for the for loop. No Currently no running_coroutine will actively fire main.
scheduler
Scheduler is the core of coroutine scheduling.
Scheduler’s data structure
struct coroutine_scheduler {
coroutine_t *main_coroutine;
coroutine_t *running_coroutine;
coroutine_list_t coroutine_queue;
};
typedef struct coroutine_scheduler coroutine_scheduler_t;
struct coroutine_list {
coroutine_t *head;
coroutine_t *tail;
};
typedef struct coroutine_list coroutine_list_t;
Copy the code
main_coroutine
: Its entry points tocoroutine_scheduler_main
The main_coroutine () function, similar to runloop in a thread, provides a for loop that reads the head from the coroutine queue, executes the entry function for the head, and suspends the main_coroutine when the queue is empty.running_coroutine
: Records the current running coroutine. This field is used to retrieve or suspend the current coroutine.coroutine_queue
: is a bidirectional list that holds coroutines added to the current scheduler, which scheduler removes from the list when the entry function of the coroutine completes execution.
scheduler
The creation process of.
Coroutine_scheduler_t * coroutINE_selF_CREATE_IF_not_exists (void) {if(! coroutine_scheduler_key) { pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free); } void *schedule = pthread_getspecific(coroutine_scheduler_key);if(! schedule) { schedule = coroutine_scheduler_new(); pthread_setspecific(coroutine_scheduler_key, schedule); }return schedule;
}
Copy the code
Pthread_setspecific and pthread_getSpecific are thread-stored access functions, ostensibly a global variable that can be used by all threads, and whose value is stored separately in each thread. The key stored by a thread is of type pthread_key_T and is created by pthread_KEY_CREATE. Pthread_key_create takes two arguments: the first is a string key and the second is a cleanup function. This cleanup function is called when the thread frees the storage space corresponding to the key value. The thread store is created in a way that ensures that there is only one scheduler per thread and provides an entry point to that scheduler.
coroutine_scheduler_main
// The main entry of the coroutine's scheduler // The scheduler is just a special coroutine, so we can use yield. void coroutine_scheduler_main(coroutine_t *scheduler_co) { coroutine_scheduler_t *scheduler = scheduler_co->scheduler; for (;;) { // Pop a coroutine from the scheduler's queue.
coroutine_t *co = scheduler_queue_pop(scheduler);
if (co == NULL) {
// Yield the scheduler, give back cpu to origin thread.
coroutine_yield(scheduler_co);
// When some coroutine add to the scheduler's queue, // the scheduler will resume again, // then will resume here, continue the loop. continue; } // Set scheduler's current running coroutine.
scheduler->running_coroutine = co;
// Resume the coroutine
coroutine_resume_im(co);
// Set scheduler's current running coroutine to nil. scheduler->running_coroutine = nil; // if coroutine finished, free coroutine. if (co->status == COROUTINE_DEAD) { coroutine_close_ifdead(co); }}}Copy the code
The coroutine_scheduler_main function is a scheduler runloop. A for loop that reads coroutines from its own coroutine queue. When the scheduler’s coroutine queue is not empty, the head is removed from the queue to execute the entry function. When all coroutines in the queue are removed, the coroutine queue of the current scheduler is empty. The main_coroutine is suspended by the coroutine_yield function. Coroutine_yield stores the current context, which means that the next time the main_coroutine is resumed, it will continue from there, continuing the for loop.
coroutine_resume_im
void coroutine_resume_im(coroutine_t *co) {
switch (co->status) {
case COROUTINE_READY:
{
co->stack_memory = coroutine_memory_malloc(co->stack_size);
co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
// get the pre context
co->pre_context = malloc(sizeof(coroutine_ucontext_t));
BOOL skip = false;
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;
free(co->context);
co->context = calloc(1, sizeof(coroutine_ucontext_t));
coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
// setcontext
coroutine_begin(co->context);
break;
}
case COROUTINE_SUSPEND:
{
BOOL skip = false;
coroutine_getcontext(co->pre_context);
if (skip) {
// when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
return;
}
#pragma unused(skip)
skip = true;
// setcontext
coroutine_setcontext(co->context);
break;
}
default:
assert(false);
break; }}Copy the code
For im function, this article mainly introduces cooBJC scheduling without detailed explanation. We just need to execute this function before COROUTINE_READY will execute im entry. In the COROUTINE_SUSPEND state, the context is restored where the yield is interrupted.
Let’s review the whole process of CO scheduling. A unique scheduler is created within a thread. Scheduler contains main_CO, running_CO, and CO_queue. Main_co entry is a for loop. Fetch the head co from the CO_queue, set head to running_co, and execute the head entry. When coroutine_resume(CO) is called. If running_CO exists, running_CO will be yield suspended, main_CO will fetch a new CO from the CO_queue to perform its entry, and when the for loop iterates through the suspended CO again, the program will jump to the yield function to continue execution. If runing_co does not exist, the CO is added to the CO_queue, and resume executes the entry of the main_coroutine, and the for loop begins.
co_lauch
The co_launch function is something like dispatch_async. The difference is co_launch, which places tasks to be executed in a coroutine queue. Dispatch_async, which places tasks to be executed in a thread queue. At the scheduling level, coroutine_resume adds co to the scheduler’s CO_queue. Within this task, you can yield, resume, or preempt threads.
NS_INLINE COCoroutine * _Nonnull co_launch(void(^ _Nonnull block)(void)) {
COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
return [co resume];
}
- (COCoroutine *)resume {
COCoroutine *currentCo = [COCoroutine currentCoroutine];
BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
[self.dispatch dispatch_async_block:^{
if (self.isResume) {
return;
}
if (isSubroutine) {
self.parent = currentCo;
[currentCo addChild:self];
}
self.isResume = YES;
coroutine_resume(self.co);
}];
return self;
}
Copy the code