并发的关键概念

360 阅读9分钟

并发

并发是什么?

在计算机科学中,并发的定义是指:在一个程序的运行过程中,程序的不同部分可以以乱序或者部分有序的方式执行,但是最终程序的输出结果与顺序执行一致。

定义中有两个关键点

  1. 乱序或者部分有序
  2. 结果与顺序执行一致

假设程序 XA,B 两部分组成,B 依赖 A,顺序执行情况下,先执行 A 然后执行 B,输出结果为 y,耗时:T_A + T_B

进一步研究发现,A 可以分成 A_1,A_2 两部分,且 A_2 依赖于另一个任务 C,也就是执行完A_1之后,需要等待 C 也执行完才能继续往下走,令等待 C 完成的时间为 T_C,那么就有 T_A + T_B= T_{A1} + T_C + T_{A2} + T_B,这是顺序执行情况下的耗时。

再进一步研究发现,B 仅仅依赖于 A_1,为了提高效率,我们可以这样做,执行完 A_1 之后,为了避免CPU等待空闲,直接调度任务 B,等任务 B 完成之后,假设任务 C 也完成了,那么切换到任务 A 执行完 A_2 部分。

这是一个简单的并发case,可以看到

  1. 程序的执行顺序变成部分有序,从A_1 -> A_2 -> B 变成 A_1 -> B -> A_2
  2. 任务 BA_2 的前置依赖均被满足,保证了最终输出结果依然是 y

为什么要并发?

那为什么要费这么多事来实现并发呢,老老实实顺序执行不好吗?换句话说,通过并发我们获得了什么。

效率是关键。在上面的例子中,采用并发的执行方式,T_C 被节省下来。

并发带来的另一个明显好处是多任务,就算是在一个单核CPU(single processor)机器上,也能同时运行多个应用,这是因为多个应用可以分时复用CPU,这是多个应用之间的并发。实际上,单核CPU同一时刻只能运行一个应用(这就是为什么我把上文的“同时”二字加粗的原因),但是从用户的视角来看好像有多个CPU一样,应用之间的并发虚拟化出了多个CPU的效果。

没有并发的世界是可怕的,想一想你只有把全部的工作做完才能去玩游戏,可是工作哪有做完的时候呢?我只好在工作和游戏之间来回切换,切换是有代价的,全情投入工作之前,要先把游戏里的心思先收回来,回忆起上一段工作的内容,这跟线程切换几乎一模一样。把我类比成CPU,实际上,我从事的各种活动是在分时复用“我”这个资源的。

并发有什么问题?

一般我们遇到线程并发和协程并发的情况比较多,这里的线程和协程就是并发单位。为了具体的说明问题,我们拿线程的并发举例。

第一个关键概念是临界区(critical section)。临界区指的是一段代码,这段代码会访问共享资源,这个共享资源可能是一个简单变量,也可能是一个更加复杂的数据结构。

第二个关键概念是竞争条件(race condition)。竞争条件是指,在多线程程序中,多个线程有可能几乎同时访问临界区,并且尝试更新共享资源,这可能导致意想不到的结果。比如说,两个线程同时对共享变量x执行自增操作,结果可能是+1,也可能是+2。也就是程序的运行结果是不确定的,这样的程序叫做非确定程序(indeterminate program),这是第三个关键概念。

程序运行的非确定性,这是并发要解决的本质问题,至于并发程序难于编写、难于理解、编写不当还会出现死锁,这些都是属于技术层面的问题(所以并发的定义中没有提到效率)。

怎么实现并发?

为了保证并发程序的确定性,我们需要使用一些工具,这些工具叫同步原语(mutual exclusion primitives),具体来说有:

  1. 互斥变量(mutex)
  2. 条件变量(condition variable,也叫做monitor)
  3. 信号量(semaphore)

互斥变量提供一种加锁机制。在访问临界区的之前,调用互斥变量的lock函数,能够保证每次只有一个线程进入到临界区,当然,离开临界区之后做的第一件事就是调用互斥变量提供的unlock函数,释放共享资源,保证其他线程或者当前线程下一次能够再次进入临界区。多线程环境下,共享变量x的自增操作可以使用互斥变量来保证正确性。

互斥变量提供一种互斥访问的机制,条件变量提供的则是同步机制。想让任务B在任务A之后执行,只需要使用互斥变量m,在调度任务B之前调用m的wait函数,在执行任务A之后调用m的signal函数。m的作用是,不管调度顺序怎么样,在signal执行之前,wait会一直等待。

信号量最早由Dijkstra提出,目的也是为了防止竞争条件的出现,但是其原始语义与条件变量和信号量不一样,并且我们会看到,互斥变量和条件变量都是信号量的一种特殊形式。

每个信号量都有一个counter,代表当前可用资源数,信号量还提供两个操作,sem_wait:当counter-1大于0的时候返回成功,且执行counter减1,当counter-1小于0的时候阻塞;sem_post,执行counter加1操作,且如果当前有线程正在等待,随机唤醒其中一个线程。

counter值只能取0或者1的的信号量称之为布尔信号量(binary semaphore),counter初始值为1的布尔信号量功能相当于互斥变量,counter初始值为0的布尔信号量相当于条件变量。

GOLANG中的互斥变量、条件变量、信号量

下面用具体的case说明为什么binary semaphore可以实现互斥变量、条件变量的功能。

GOLANG中的互斥变量

golang中的sync.Mutex就是互斥变量,如上所述,互斥变量可以解决多线程共享变量自增的正确性。

package main

import (
  "fmt"
  "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m *sync.Mutex) {
  m.Lock()
  x = x + 1
  m.Unlock()
  wg.Done()
}
func main() {
  var w sync.WaitGroup
  var m sync.Mutex
  for i := 0; i < 1000; i++ {
      w.Add(1)
      go increment(&w, &m) // 這裡一定要用 address
  }
  w.Wait()
  fmt.Println("final value of x", x)
}

互斥变量作用等同于容量为1的信号量,所以上面的case可以改写成:

package main

import (
  "fmt"
  "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m chan int) {
  m <- 1 // 信号量代替mutex
  x = x + 1
  <- m
  wg.Done()
}
func main() {
  var w sync.WaitGroup
  m := make(chan int, 1)
  for i := 0; i < 1000; i++ {
      w.Add(1)
      go increment(&w, m) // 這裡一定要用 address
  }
  w.Wait()
  fmt.Println("final value of x", x)
}

GOLANG中的条件变量

GOLANG中的条件变量就是unbuffered channel。实际上,channel就是golang中的信号量实现,buffered channel的capacity就是信号量中的counter,不防统一称之为容量。

事实上,unbuffered channel的capacity等于0,前面说过,容量为0的信号量作用等同于条件变量。

下面的case我想在程序退出(也就是main goroutine结束)之前在屏幕上输出hello world,为了实现这点,我使用了done这个类型为chan bool的channel变量。

package main

import (
    "fmt"
    "time"
)

func hello(done chan bool) {
    fmt.Println("hello world")
    time.Sleep(4 * time.Second)
    done <- true
}
func main() {
    done := make(chan bool) // done的作用等同于条件变量
    fmt.Println("Main going to call hello go goroutine")
    go hello(done)
    <- done // 管道读操作一直block,直到 hello goroutine执行并往管道中写数据,注释掉此行,main goroutine会一直执行到结束,hello goroutine不会被调度
    fmt.Println("Main received data")
}

C语言中的互斥变量、条件变量、信号量

C语言中同步原语的实现体现在pthread(POSIX Threads,POSIX是个标准,pthread是按照POSIX关于线程的标准实现的线程库)这个库中。

  • pthread_mutex_t:互斥变量类型,加锁:pthread_mutex_lock,释放锁:pthread_mutex_unlock
  • pthread_cond_t:条件变量类型,等待:pthread_cond_wait,唤醒:pthread_cond_signal
  • sem_t:信号量类型,wait:sem_wait,post:sem_post

另外,pthread还提供pthread_join函数,其语义与GOLANG中的waitgroup一致。

GOLANG中的SELECT语义pthread库没有直接提供,但是POSIX标准里面定义了select和pselect这两个功能差不多的函数来实现这个语义,linux中这两个函数都是作为系统调用实现,不同的是select和pselect监听的都是文件描述符(poll epoll select的区别与联系)。

生产者消费者问题

下面用C和GOLANG两种语言实现多producer,多consumer的生产者消费者队列。

  1. C语言版
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <semaphore.h>

#define MAX 4

int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;

int loops = 100;

void put(int value)
{
    buffer[fill] = value;
    fill = (fill + 1) % MAX;
    count++;
}

int get()
{
    int tmp = buffer[use];
    use = (use + 1) % MAX;
    count--;

    return tmp;
}

sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg)
{
    int i;
    for(i = 0; i < loops; i++)
    {
        sem_wait(&empty);
        sem_wait(&mutex);
        put(i);
        sem_post(&mutex);
        sem_post(&full);
    }

    return 0;
}

void *consumer(void *arg)
{
    int i, tmp = 0;
    for(i = 0; i < loops; i++)
    {
        sem_wait(&full);
        sem_wait(&mutex);
        tmp = get();
        sem_post(&mutex);
        sem_post(&empty);
        printf("current number : %d\n", tmp);
    }

    return 0;
}


int main()
{
    sem_init(&empty, 0, MAX);
    sem_init(&full, 0, 0);
    sem_init(&mutex, 0, 1);

    pthread_t p, c, p1, p2, c1, c2;

    pthread_create(&p, NULL, producer, NULL);
    pthread_create(&p1, NULL, producer, NULL);
    pthread_create(&p2, NULL, producer, NULL);

    pthread_create(&c, NULL, consumer, NULL);
    pthread_create(&c1, NULL, consumer, NULL);
    pthread_create(&c2, NULL, consumer, NULL);

    pthread_join(p, NULL);
    pthread_join(p1, NULL);
    pthread_join(p2, NULL);
    pthread_join(c, NULL);
    pthread_join(c1, NULL);
    pthread_join(c2, NULL);

    return 0;
}
  1. GOLANG版
package main

import (
  "fmt"
)
var MSG_BUFFER = 4
var COSUMER_CNT = 3
var NUM_CNT = 100

var msgs = make(chan int, MSG_BUFFER)
// 多个消费者,用buffered channel控制消费者全部执行完之后推出main goroutine
var done = make(chan int, COSUMER_CNT)

func produce() {
    for i := 0; i < NUM_CNT; i++ {
        msgs <- i
    }
}

func consume() {
    for i := 0; i < NUM_CNT; i++ {
        msg := <-msgs
        fmt.Println(msg)
    }
    done <- 1
}

func main () {
    for i:= 0; i < COSUMER_CNT; i++ {
        go produce()
        go consume()
    }

    for i:= 0; i < COSUMER_CNT; i++ {
        <- done
    }
}

总结以及下篇展望

互斥变量实际上是一个锁,条件变量和信号量都是基于锁实现的,有必要说说锁的原理,下篇内容包括:

  • cpu层面的原子操作 TAS case(汇编代码)
  • 几种锁的实现方式(硬件中断锁、自旋锁、队列锁)
  • 评估不同锁实现方式的关键:效率和公平
  • 几种编程语言中的锁:GOLANG、JAVA
  • 数据库中的锁

参考

  • 本文大部分知识通过阅读Operating Systems: Three Easy Pieces这本操作系统教材习得,这是我读过的最好的关于操作系统的教材。本书最大的特点是将操作系统仅仅分成三个部分来讲述:虚拟化、并行、持久化,虽然只有三个部分,操作系统的核心部分却都覆盖到了。本书作者的语言功底很强,读起来朗朗上口。另外,本书的脉络清楚,整本书都在回答一个问题:操作系统作为第一个也是最重要的一个软件,是如何使得计算机系统易于用户使用的。