阅读 207

Libuv学习——线程池

image.png


前言


通过前两篇文章的学习,我们已经解了Libuv中的队列线程,为本文的学习打下基础,没有看过的同学建议先看下。下面将从生产者消费者模型和源码两个角度学习Libuv的线程池,为后面学习Libuv文件处理做铺垫。


生产者消费者模型


Node.js的文件操作支持同步调用和异步调用,根据Libuv官网的介绍,我们知道它没有跨平台的异步文件IO可以使用,所以它的异步文件IO是通过在线程池中执行同步文件IO实现的。那具体是怎么实现的呢?答案就是生产者消费者模型。Libuv的线程包括2部分,一个是主线程,一个是线程池。主线程的一部分工作是描述任务并将其提交给线程池,线程池进行处理。拿异步文件操作为例,主线程生成一个描述文件操作的对象,将其提交到任务队列线程池从任务队列获取该对象进行处理。其中主线程是生产者,线程池中的线程是消费者,任务队列是生产者和消费者之间的桥梁,下面是一个简单的示意图:

image.png

Libuv在生产者消费者模型中多加了一步,线程池执行完任务后,将结果交给主线程,主线程拿到结果后,如果发现有回调函数需要执行,就执行。所以Libuv的线程模型如下:

image.png


源码分析


Libuv线程池的代码很容易找到,就在src目录下的threadpool.c文件中。


image.png


通过上面对生产者消费者模型的介绍,该代码大致分为4部分:任务队列、主线程提交任务到任务队列(提交任务)、线程池从任务队列获取任务并执行(消费任务)、线程池执行完任务通知主线程执行回调函数(回调处理)。


任务队列


任务队列就是一个队列而已。由于任务队列会被多个线程(主线程、线程池)同时访问,为了保证线程安全,需要互斥锁。另外任务队列如果为空,线程池中的线程需要挂起,等待主线程提交任务后唤起,所以还需要条件变量。任务队列、条件变量、互斥量的定义如下所示:


1...
2static uv_cond_t cond; // 条件变量
3static uv_mutex_t mutex; // 互斥锁
4...
5static QUEUE wq; // 任务队列
6...复制代码


提交任务


主线程将任务提交到任务队列是通过uv__work_submit来实现的,让我们来看下它的代码:


 1struct uv__work {
 2  void (*work)(struct uv__work *w);
 3  void (*done)(struct uv__work *w, int status);
 4  struct uv_loop_s* loop;
 5  void* wq[2]; // 用于将其关联到任务队列中
 6};
 7
 8void uv__work_submit(uv_loop_t* loop,
 9                     struct uv__work* w,
10                     enum uv__work_kind kind,
11                     void (*work)(struct uv__work* w),
12                     void (*done)(struct uv__work* w, int status)) {
13  uv_once(&once, init_once); // 初始化线程,无乱调用多少次,init_once只会执行一次
14  w->loop = loop; // 事件循环
15  w->work = work; // 线程池要执行的函数
16  w->done = done; // 线程池执行结束后,通知主线程要执行的函数
17  post(&w->wq, kind); // 将任务提交任务队列中
18}复制代码


uv__work_submit有4个参数:第一个参数为Libuv的事件循环,这里我们先忽略,以后会有专门的文章介绍;第二个参数是线程池执行任务的通用模型,类型为uv__work,属性work表示线程池中要执行的函数,属性done表示线程池执行完,通知主线程要执行的函数;第三、四个参数分别对应work函数和done函数。该函数主要做了两件事情:一件是通过uv_once调用init_once来初始化线程池;另一件是对w进行赋值,然后通过post将其提交到任务队列。这里需要注意,通过nv_once可以保证uv__work_submit在调用多次的情况,init_once只执行一次,nv_once底层是通过pthread_once实现的。init_once会在下一节介绍,让我们先来看下post。


 1static void post(QUEUE* q, enum uv__work_kind kind) {
 2  // 获取锁
 3  uv_mutex_lock(&mutex);
 4  ...
 5  // 将任务添加到任务队列的最后
 6  QUEUE_INSERT_TAIL(&wq, q);
 7  
 8  // 如果线程池中有挂起的线程,就唤起挂起的线程,让其工作
 9  if (idle_threads > 0)
10    uv_cond_signal(&cond);
11  // 释放锁
12  uv_mutex_unlock(&mutex);
13}复制代码


代码很简单,先获取锁mutex,然后将任务提交到任务队列中。如果线程池中有挂起的线程,就通过条件变量cond唤起并放弃锁mutex。


消费任务


任务队列中的任务是通过线程池进行消费的,而线程池的初始化是在uv__work_submit调用init_once实现的,先看下如何初始化线程池吧:


1static void init_once(void) {
2  ...
3  init_threads();
4}复制代码


init_once调用了init_threads,那就看下init_threads。


 1...
 2#define MAX_THREADPOOL_SIZE 1024 // 线程池的最大数量
 3...
 4static uv_thread_t* threads; // 线程池
 5static uv_thread_t default_threads[4]; // 默认的线程池,线程数量为4
 6...
 7
 8
 9static void init_threads(void) {
10  unsigned int i;
11  const char* val;
12  ...
13      
14  // 计算线程池中线程的数量,不能大于最大值MAX_THREADPOOL_SIZE
15  nthreads = ARRAY_SIZE(default_threads);
16    
17  // 通过环境变量设置线程池的大小
18  val = getenv("UV_THREADPOOL_SIZE");
19  if (val != NULL)
20    nthreads = atoi(val);
21  
22  // 保存线程池中最少有一个线程
23  if (nthreads == 0)
24    nthreads = 1;
25    
26  // 线程池中线程数量不能超过MAX_THREADPOOL_SIZE
27  if (nthreads > MAX_THREADPOOL_SIZE)
28    nthreads = MAX_THREADPOOL_SIZE;
29    
30  // 初始化线程池
31  threads = default_threads;
32  if (nthreads > ARRAY_SIZE(default_threads)) {
33    threads = uv__malloc(nthreads * sizeof(threads[0]));
34    if (threads == NULL) {
35      nthreads = ARRAY_SIZE(default_threads);
36      threads = default_threads;
37    }
38  }
39    
40  // 创建条件变量
41  if (uv_cond_init(&cond))
42    abort();
43  
44  // 创建互斥量
45  if (uv_mutex_init(&mutex))
46    abort();
47
48  // 初始化任务队列
49  QUEUE_INIT(&wq);
50  ...
51      
52  // 根据线程池的数量,初始化线程池中的每个线程,并执行worker函数
53  for (i = 0; i < nthreads; i++)
54    if (uv_thread_create(threads + i, worker, &sem))
55      abort();
56  
57  ...
58}复制代码


通过上面的代码可以知道init_threads先获取线程池的大小nthreads;然后初始化互斥量mutex、条件变量cond和任务队列wq;最后创建nthreads个线程,每个线程执行worker函数。worker函数的核心就是消费任务队列中的任务,让我们详细的看下它:


 1static void worker(void* arg) {
 2  struct uv__work* w;
 3  QUEUE* q;
 4  
 5  ...
 6  arg = NULL;
 7    
 8  // 获取互斥锁
 9  uv_mutex_lock(&mutex);
10    
11  // 通过无限循环,保证线程一直执行
12  for (;;) {
13    
14    // 如果任务队列为空,通过等待条件变量cond挂起,并释放锁mutex
15    // 主线程提交任务通过uv_cond_signal唤起,并重新获取锁mutex
16    while (QUEUE_EMPTY(&wq) || ...) {
17      idle_threads += 1;
18      uv_cond_wait(&cond, &mutex);
19      idle_threads -= 1;
20    }
21      
22    // 从任务队列中获取第一个任务
23    q = QUEUE_HEAD(&wq);
24    ...
25        
26    // 将该任务从任务队列中删除
27    QUEUE_REMOVE(q);
28    QUEUE_INIT(q);
29      
30    ...
31        
32    // 操作完任务队列,释放锁mutex
33    uv_mutex_unlock(&mutex);
34
35    // 获取uv__work对象,并执行work
36    w = QUEUE_DATA(q, struct uv__work, wq);
37    w->work(w);
38
39    // 获取loop的互斥锁wq_mutex
40    uv_mutex_lock(&w->loop->wq_mutex);
41    w->work = NULL;
42    
43    // 将执行完work函数的任务挂到loop->wq队列中
44    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
45      
46    // 通过uv_async_send通知主线程,当然有任务执行完了,主线程可以执行任务中的done函数。
47    uv_async_send(&w->loop->wq_async);
48    uv_mutex_unlock(&w->loop->wq_mutex);
49
50    // 获取锁,执行任务队列中的下一个任务
51    ...
52    uv_mutex_lock(&mutex);
53    ...
54  }
55}复制代码


worker的本质就是从任务队列中获取任务,然后执行work函数。执行完后,将该任务提交到事件循环loop的wp队列中,通过uv_async_send告知主线程执行任务中的done函数。


回调处理


上面我们介绍了worker函数在执行完任务后会通过uv_async_send告知主线程执行回调函数,那这块是怎么实现的呢?这里涉及到了事件循环,这里我们就简单的介绍一下,后面会有详细的文章介绍它。事件循环loop在初始化的时候会调用uv_async_init,该函数的第三个参数是一个函数,当其他线程调用uv_async_send时,该函数就会执行。具体代码如下:


 1uv_async_init(loop, &loop->wq_async, uv__work_done);
 2
 3void uv__work_done(uv_async_t* handle) {
 4  struct uv__work* w;
 5  uv_loop_t* loop;
 6  QUEUE* q;
 7  QUEUE wq;
 8  int err;
 9
10  loop = container_of(handle, uv_loop_t, wq_async);
11  uv_mutex_lock(&loop->wq_mutex);
12  QUEUE_MOVE(&loop->wq, &wq);
13  uv_mutex_unlock(&loop->wq_mutex);
14
15  while (!QUEUE_EMPTY(&wq)) {
16    q = QUEUE_HEAD(&wq);
17    QUEUE_REMOVE(q);
18
19    w = container_of(q, struct uv__work, wq);
20    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
21    w->done(w, err);
22  }
23}复制代码


uv__work_done很简单,获取loop中的wq队列,获取队列中的每个任务并调用done函数。


总结


本文首先介绍了生产者消费者模型,然后通过任务队列、提交任务、消费任务、回调处理讲解了Libuv线程池,为下一篇讲解Libuv文件处理做铺垫,如果你对Libuv系列感兴趣的话,欢迎关注我们。


关注下面的标签,发现更多相似文章
评论