协程coobjc源码分析:co调度

1,643 阅读6分钟

这篇文章主要通过源码分析,介绍coobjc中的co调度。这个问题搞清楚之后,co_lauch做了什么,看起来就很简单了。我们先了解coroutine和scheduler这两个关键的数据结构。

coroutine

在协程的数据结构中和调度相关的字段。

  • entry: 需要执行的任务,最终指向的是co_launch(block)中的block。
  • userdata: 一个OC的类对象COCoroutine,这个对象持有coroutine这个数据结构,和它一一对应。
  • context: 是协程执行的当前上下文。
  • pre_context: 保存的是这个协程被挂起或者执行完成后需要回复的上下文,coobjc通过切换上下文来实现函数的跳转。
  • scheduler: co被scheduler的co_queue持有,scheduler是co调度的核心。
    struct coroutine {
        coroutine_func entry;                   // Process entry.
        void *userdata;                         // Userdata.
        void *context;                          // Coroutine,
        void *pre_context;                      // Coroutine's source process's Call stack data.
        
        struct coroutine_scheduler *scheduler;  // The pointer to the scheduler.
        ...
    };
    typedef struct coroutine coroutine_t;

coobjc通过yield,resume,add来操作co。这三个方法在co调度的时候也会被频繁用到。

coroutine_yield

void coroutine_yield(coroutine_t *co)
{
    if (co == NULL) {
        // if null
        co = coroutine_self();
    }
    BOOL skip = false;
    coroutine_getcontext(co->context);
    if (skip) {
        return;
    }
#pragma unused(skip)
    skip = true;
    co->status = COROUTINE_SUSPEND;
    coroutine_setcontext(co->pre_context);
}

这个函数的作用是挂起协程。下面提到的main指的是scheduler中main_coroutine的入口函数 coroutine_scheduler_main,im指的是coroutine_resume_im(coroutine_t *co) 这个函数会执行co的entry,main中有个for循环遍历co_queue取出head通过im函数执行head的entry。关于scheduler下面会有详细介绍。

coroutine_setcontext(co->pre_context);这一行可以让程序跳转到coroutine_getcontext(co->pre_context)这里。通常情况下的调用栈main()->im()->coroutine_getcontext(co->pre_context)。当yeild执行的时候,程序跳转到im函数中coroutine_getcontext(co->pre_context)的这个位置,im函数会直接return,跳转到main函数的for循环里,继续取出co_queue的head执行head的entry。当for循环再次执行到这个被挂起的co的时候,在im执行co entry 方法中,调用coroutine_setcontext(co->context),程序跳转到 coroutine_yield()方法中的 coroutine_getcontext(co->context);这一行,此时skip是yes。coroutine_yield()函数return。回到调用coroutine_yield()的地方。yield通过保存上下文,使得被挂起的co下次能够在之前的上下文环境下继续执行。

coroutine_resume

void coroutine_resume(coroutine_t *co) {
    if (!co->is_scheduler) {
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        
        scheduler_queue_push(scheduler, co);
        
        if (scheduler->running_coroutine) {
            // resume a sub coroutine.
            scheduler_queue_push(scheduler, scheduler->running_coroutine);
            coroutine_yield(scheduler->running_coroutine);
        } else {
            // scheduler is idle
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

coroutine_resume 这个方法是把co push 到scheduler的协程队列里面。如果当前有协程在运行的话,那个当前运行的协程就会被挂起,push到协程队列里面,如果co_queue中只有新添加进来的co和被挂起的co,此时新添加进来的co处于queue的head会被main函数的for循环取出执行entry。如果当前没有协程在运行,就会执行scheduler中main_corroutine的entry函数,这个函数是一个for循环从队列中读取co,执行co的entry。添加到co_quue队列中的co最终会被执行。后面的判断如果没有runing_coroutine,这个时候main_coroutine被挂起,for循环不执行,需要主动触发一次main函数的调用coroutine_resume_im(co->scheduler->main_coroutine);

coroutine_add

void coroutine_add(coroutine_t *co) {
    if (!co->is_scheduler) {
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        if (scheduler->main_coroutine->status == COROUTINE_DEAD) {
            coroutine_close_ifdead(scheduler->main_coroutine);
            coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);
            main_co->is_scheduler = true;
            main_co->scheduler = scheduler;
            scheduler->main_coroutine = main_co;
        }
        scheduler_queue_push(scheduler, co);
        
        if (!scheduler->running_coroutine) {
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

这个方法把当前co添加到scheduler协程队列里面。如果main_coroutine的状态是dead,会创建一个main_coroutine,coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);这里可以看到main_coroutine的entry指向的是coroutine_scheduler_main这个函数下面还会讲到,作用就是前面一直在说的for循环。没有当前没有running_coroutine会主动触发main函数。

scheduler

scheduler是协程调度的核心。

scheduler的数据结构

    struct coroutine_scheduler {
        coroutine_t         *main_coroutine;
        coroutine_t         *running_coroutine;
        coroutine_list_t     coroutine_queue;
    };
    typedef struct coroutine_scheduler coroutine_scheduler_t;
    
        struct coroutine_list {
        coroutine_t *head;
        coroutine_t *tail;
    };
    typedef struct coroutine_list coroutine_list_t;
  • main_coroutine:它的entry指向 coroutine_scheduler_main函数,类似于线程中的runloop提供一个for循环,不断读取协程队列中的head,执行head的入口函数,队列为空的时候main_coroutine会被挂起。
  • running_coroutine : 用来记录当前正在运行中的协程,获取或者挂起当前协程都会用到这个字段。
  • coroutine_queue: 是一个双向链表,用来保存添加到当前scheduler的协程,当协程的入口函数执行完成后,scheduler会把它从链表中清除。

scheduler的创建过程。

//scheduler 的创建
coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) {
    
    if (!coroutine_scheduler_key) {
        pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free);
    }
    
    void *schedule = pthread_getspecific(coroutine_scheduler_key);
    if (!schedule) {
        schedule = coroutine_scheduler_new();
        pthread_setspecific(coroutine_scheduler_key, schedule);
    }
    return schedule;
}

pthread_setspecificpthread_getspecific是线程存储的存取函数,表面上看起来这是一个全局变量,所有线程都可以使用它,而它的值在每个线程中都是是单独存储的。线程存储的key值是pthread_key_t类型,通过pthread_key_create创建,pthread_key_create需要两个参数第一个是字符串类型的key值,第二个参数是一个清理函数,线程释放这个key值对应的存储空间的的时候,这个清理函数会被调用。线程存储的创建方式保证了每一个线程中只有一个scheduler,并且提供了获取这个scheduler的入口。

coroutine_scheduler_main

// The main entry of the coroutine's scheduler
// The scheduler is just a special coroutine, so we can use yield.
void coroutine_scheduler_main(coroutine_t *scheduler_co) {
    
    coroutine_scheduler_t *scheduler = scheduler_co->scheduler;
    for (;;) {
        
        // Pop a coroutine from the scheduler's queue.
        coroutine_t *co = scheduler_queue_pop(scheduler);
        if (co == NULL) {
            // Yield the scheduler, give back cpu to origin thread.
            coroutine_yield(scheduler_co);
            
            // When some coroutine add to the scheduler's queue,
            // the scheduler will resume again,
            // then will resume here, continue the loop.
            continue;
        }
        // Set scheduler's current running coroutine.
        scheduler->running_coroutine = co;
        // Resume the coroutine
        coroutine_resume_im(co);
        
        // Set scheduler's current running coroutine to nil.
        scheduler->running_coroutine = nil;
        
        // if coroutine finished, free coroutine.
        if (co->status == COROUTINE_DEAD) {
            coroutine_close_ifdead(co);
        }
    }
}

coroutine_scheduler_main函数是scheduler的runloop。一个for循环,从自己的协程队列里面读取协程。当scheduler的协程队列不为空的时候,会从队列中取出head执行入口函数。当队列里面的协程全部取出后,当前scheduler的协程队列coroutine_queue为空。main_coroutine会被coroutine_yield这个函数挂起。coroutine_yield会保存当前上下文,也就是说当main_coroutine下次被resume的时候,会从这里继续执行下去,继续for循环。

coroutine_resume_im

void coroutine_resume_im(coroutine_t *co) {
    switch (co->status) {
        case COROUTINE_READY:
        {
            co->stack_memory = coroutine_memory_malloc(co->stack_size);
            co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
            // get the pre context
            co->pre_context = malloc(sizeof(coroutine_ucontext_t));
            BOOL skip = false;
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            
            free(co->context);
            co->context = calloc(1, sizeof(coroutine_ucontext_t));
            coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
            // setcontext
            coroutine_begin(co->context);
            
            break;
        }
        case COROUTINE_SUSPEND:
        {
            BOOL skip = false;
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            // setcontext
            coroutine_setcontext(co->context);
            
            break;
        }
        default:
            assert(false);
            break;
    }
}

对于im这个函数,这篇文章主要介绍的是coobjc的调度,不做详细的说明。我们只需要执行这个函数在COROUTINE_READY会执行im的entry。在COROUTINE_SUSPEND状态下会恢复之前的context也就是yield中断的地方。

我们来回顾一下co调度的整个流程。在一个线程中会创建唯一的数据构 scheduler。scheduler中包含main_co,running_co,co_queue。main_co的entry是一个for循环,在co_queue队列里面取出head co,设置head为running_co,执行head的entry。当调用coroutine_resume(co)的时候。如果running_co存在,那么running_co就会被yield挂起,main_co会从co_queue取出一个新的co执行它的entry,当for循环再次遍历到这个 被挂起的co的时候,程序会跳转到yield函数里面继续执行。如果如果runing_co不存在存在的话co会被添加到co_queue,同时resume会执行main_coroutine的entry,for循环开始。

co_lauch

co_launch 这个函数的功能类似于,dispatch_async。区别是co_launch,把需要执行的任务放到一个协程队列里面,dispatch_async是把执行任务放到一个线程队列里面执行。在调度层面通过coroutine_resume把co添加到scheduler的co_queue,在这个执行任务里面,你可以通过yield,resume来交出线程或者抢占线程。

NS_INLINE COCoroutine * _Nonnull  co_launch(void(^ _Nonnull block)(void)) {
    COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
    return [co resume];
}

- (COCoroutine *)resume {
    COCoroutine *currentCo = [COCoroutine currentCoroutine];
    BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
    [self.dispatch dispatch_async_block:^{
        if (self.isResume) {
            return;
        }
        if (isSubroutine) {
            self.parent = currentCo;
            [currentCo addChild:self];
        }
        self.isResume = YES;
        coroutine_resume(self.co);
    }];
    return self;
}