Libuv学习——线程基础

372 阅读7分钟

前言

原文:Unlike network I/O, there are no platform-specific file I/O primitives libuv could rely on, so the current approach is to run blocking file I/O operations in a thread pool。 翻译:不像网络IO,libuv没有特定平台的异步IO原语可以依赖,所以当前是在线程池中执行阻塞(同步)IO来实现异步的。

根据libuv官网对其架构的介绍,我们可以知道它并不是单线程的,它有一个线程池,用来处理文件IODSN查询等操作。在介绍线程池之前,先通过POSIX Threads介绍一下线程的基本操作,为下一篇文章介绍线程池打下基础。如果您对libuv的整体架构感兴趣,可以访问下面链接了解,当然以后我也会写文章介绍的。

Design overview - libuv documentation​
docs.libuv.org图标

POSIX Threads

相信大家对线程的概念都有或多或少的了解,这里就不介绍了,下面将直接通过API和demo来学习。由于不同平台线程的规范和原语不一样,而我对Linux比较熟悉,所以接下来将通过Linux中的POSIX Threads来讲解。libuv线程池主要涉及到线程创建互斥锁条件变量3个东西,下面将分别介绍它们。

线程创建

让我们首先了解一下如何创建线程,代码和输出如下:

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 5

int sum = 0;

void * thread_task(void * args) {
    int max = (int)args;

    for (int i = 0; i <= max; ++i) {
        sum += i;
    }
    printf("sum: %i\n", sum);
    pthread_exit(NULL);
}

int main() {
    pthread_t threads[NUM_THREADS];
    
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setstacksize(&attr, 8192);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_create(&thread, &attr, thread_task, (void *)10);
        if (result) {
            printf("线程创建失败 errCode:%i", result);
            return -1;
        }
    }
    pthread_attr_destroy(&attr);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_join(thread, NULL);
        if (result == 3) {
            printf("线程%i已经结束了\n", i);
            continue;
        }
    }
    
    printf("main函数运行结束, sum: %i\n", sum);
    return 0;
}

上面代码很简单,创建5个线程,每个线程将传入数据作为最大值max,然后从0,1,2,3,...,max加到sum上。接下来粗略讲解一下每行代码的含义:

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 5

int sum = 0;

前4行代码引入了stdio.h、pthread.h两个头文件,函数printf在stdio.h中定义,线程相关的api在pthread.h中定义;定义了一个常量NUM_THREADS和一个变量sum,常量NUM_THREADS表示要创建的线程数,变量sum用来计算总和。

void * thread_task(void * args) {
    int max = (int)args;

    for (int i = 0; i <= max; ++i) {
        sum += i;
    }
    printf("sum: %i\n", sum);
    pthread_exit(NULL);
}

接下来定义了一个函数thread_task,该函数会被每个线程执行。函数很简单,将输入的int参数作为max,将0,1,2,3,...,max依次加到sum上,并将当前sum输出到控制台。最后执行pthread_exist结束线程。

最后让我们看下main函数里面的内容,

int main() {
    pthread_t threads[NUM_THREADS];
    
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setstacksize(&attr, 8192);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_create(&thread, &attr, thread_task, (void *)10);
        if (result) {
            printf("线程创建失败 errCode:%i", result);
            return -1;
        }
    }
    pthread_attr_destroy(&attr);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_join(thread, NULL);
        if (result == 3) {
            printf("线程%i已经结束了\n", i);
        }
    }
    
    printf("main func end\n");
    return 0;
}

先定义了5个代表线程的数组threads;接着定义线程属性变量attr,将线程的栈设为8192个字节;之后通过pthread_create创建线程,每个线程将会执行thread_task函数,并通过第3个参数将10传递给thread_task;最后通过pthread_join告诉main函数等到所有线程执行完之后再继续执行。

互斥锁

如果足够仔细,相信你可能已经发现上面的输出不符合预期,按理说应该输出275才对,为啥只输出了249呢?

我们再运行一下程序看看,结果又正常了。

让我们简单通过2个线程来分析一下,假设此刻sum值为120,线程1中i循环到3,线程2循环到6,下表展示了导致sum错误的可能情况:

通过上表可以发现之所以出现问题是因为将i加到sum这个操作不是原子的,如果从读取sum、将i加到sum整个过程变成原子操作,就不会有问题了。解决该问题的常用方法之一就是互斥锁,让我们简单修改一下代码:

...
pthread_mutex_t mutex;

void * thread_task(void * args) {
    int max = (int)args;

    for (int i = 0; i <= max; ++i) {
        pthread_mutex_lock(&mutex);
        sum += i;
        pthread_mutex_unlock(&mutex);
    }
    printf("sum: %i\n", sum);
    pthread_exit(NULL);
}

int main() {
    pthread_mutex_init(&mutex, NULL);
    ...
    pthread_mutex_destroy(&mutex);
}

从代码的角度来看,修改后的代码增加了一个全局互斥锁mutex,并在main函数初始化。在sum += i;前面加了一句代码pthread_mutex_lock(&mutex),它告诉线程尝试获取锁,获取失败就挂起,等待其他线程释放锁;获取成功就继续执行代码,并通过pthread_mutex_unlock(&mutex)将获取的锁给释放掉。

条件变量

互斥锁只解决了多个线程修改共享变量的问题,对于下面场景它是无法办法解决的。一个线程需要满足一个条件才能执行下去,而这个条件由另一个线程满足的。比如现在有一个变量i和2个线程,当i为0时第一个线程输出一段内容,并将i变成1;当i为1时,第二个线程输出一段内容,并将i变成0;两个线程依次交替执行。对于这个问题,我们可以通过条件变量来实现。下面是实现的代码和输出。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int i = 0;

pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;

void * thread_task0(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 0) {
            pthread_cond_wait(&cond0, &mutex);
        }
        sleep(1);

        printf("**************thread_task0 i: %i\n", i);
        i = 1;
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond1);
    }
}

void * thread_task1(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 1) {
            pthread_cond_wait(&cond1, &mutex);

        }
        sleep(1);
        printf("################thread_task1 i: %i\n", i);
        i = 0;

        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond0);
               
    }
}

int main() {
    pthread_t thread0;
    pthread_t thread1;
    
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond0, NULL);
    pthread_cond_init(&cond1, NULL);
    
    pthread_create(&thread0, NULL, thread_task0, NULL);
    pthread_create(&thread1, NULL, thread_task1, NULL);
    
    pthread_join(thread0, NULL);
    pthread_join(thread1, NULL);
    
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond0);
    pthread_cond_destroy(&cond1);
    return 0;
}

让我们简单分析一下代码吧,前3行引入了3个头文件,前2个已经介绍过了,第3个头文件中有sleep函数的定义,后面会用到。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

随后定义了变量i,互斥锁mutex和2个条件变量cond0、cond1,这里需要注意一下条件变量是需要和互斥锁一起使用的。

int i = 0;

pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;

紧接着我们定义了2个函数,分别由2个线程执行,由于这两个函数文字解释比较麻烦,下面通过表格来表示两个线程的执行过程。这里需要注意的是,pthread_cond_wait会放弃当前线程获得的锁,并进入挂起状态。当其他线程通过pthread_cond_signal通知该线程时,该线程会被唤起,重新获得锁。

void * thread_task0(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 0) {
            pthread_cond_wait(&cond0, &mutex);
        }
        sleep(1);

        printf("**************thread_task0 i: %i\n", i);
        i = 1;
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond1);
    }
}

void * thread_task1(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 1) {
            pthread_cond_wait(&cond1, &mutex);

        }
        sleep(1);
        printf("################thread_task1 i: %i\n", i);
        i = 0;
        
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond0);
    }
}

main函数只是用来启动上面介绍的两个线程,所以这里就不解释了。

Libuv的线程

上面介绍了POSIX Threads,接下来让我们粗略的看下libuv的线程吧,libuv官网也给出了对应的API文档,有兴趣的同学可以看下:

Threading and synchronization utilities​
docs.libuv.org

通过翻看源码,我们可以在src/unix/thread.c和src/win/thread.c文件下看到libuv线程的实现,很简单,就是对各个平台原有线程API进行包装,使得API统一化,下面通过src/unix/thread.c稍稍看下它的实现吧。

线程创建API

typedef pthread_t uv_thread_t;

int uv_thread_create_ex(uv_thread_t* tid,
                        const uv_thread_options_t* params,
                        void (*entry)(void *arg),
                        void *arg) {
  int err;
  pthread_attr_t* attr;
  pthread_attr_t attr_storage;
  size_t pagesize;
  size_t stack_size;

  /* Used to squelch a -Wcast-function-type warning. */
  union {
    void (*in)(void*);
    void* (*out)(void*);
  } f;

  stack_size =
      params->flags & UV_THREAD_HAS_STACK_SIZE ? params->stack_size : 0;

  attr = NULL;
  if (stack_size == 0) {
    stack_size = thread_stack_size();
  } else {
    pagesize = (size_t)getpagesize();
    /* Round up to the nearest page boundary. */
    stack_size = (stack_size + pagesize - 1) &~ (pagesize - 1);
#ifdef PTHREAD_STACK_MIN
    if (stack_size < PTHREAD_STACK_MIN)
      stack_size = PTHREAD_STACK_MIN;
#endif
  }

  if (stack_size > 0) {
    attr = &attr_storage;

    if (pthread_attr_init(attr))
      abort();

    if (pthread_attr_setstacksize(attr, stack_size))
      abort();
  }

  f.in = entry;
  err = pthread_create(tid, attr, f.out, arg);

  if (attr != NULL)
    pthread_attr_destroy(attr);

  return UV__ERR(err);
}

可以看到创建线程的方法和我们在POSIX Threads中介绍的差不多,都是通过pthread_create来创建,只不过通过pthread_attr_t设置了一些线程属性罢了,比如线程堆栈的大小。

互斥量API

typedef pthread_mutex_t uv_mutex_t;

int uv_mutex_init(uv_mutex_t* mutex) {
#if defined(NDEBUG) || !defined(PTHREAD_MUTEX_ERRORCHECK)
  return UV__ERR(pthread_mutex_init(mutex, NULL));
#else
  pthread_mutexattr_t attr;
  int err;

  if (pthread_mutexattr_init(&attr))
    abort();

  if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK))
    abort();

  err = pthread_mutex_init(mutex, &attr);

  if (pthread_mutexattr_destroy(&attr))
    abort();

  return UV__ERR(err);
#endif
}

void uv_mutex_lock(uv_mutex_t* mutex) {
  if (pthread_mutex_lock(mutex))
    abort();
}

void uv_mutex_unlock(uv_mutex_t* mutex) {
  if (pthread_mutex_unlock(mutex))
    abort();
}

互斥锁的API也和我们POSIX Threads里介绍的差不多。

总结

本文初步通过线程创建互斥锁条件变量介绍了POSIX Threads以及libuv本身的线程API,这些是libuv实现线程池的核心,结合上篇《Libuv学习——队列》,我们已经为下篇libuv线程池打好了基础,所有您有兴趣的话,可以关注我们微信公众号《方凳雅集》,这样您将会在第一时间看到我们的文章。