并发
并发是什么?
在计算机科学中,并发的定义是指:在一个程序的运行过程中,程序的不同部分可以以乱序或者部分有序的方式执行,但是最终程序的输出结果与顺序执行一致。
定义中有两个关键点
- 乱序或者部分有序
- 结果与顺序执行一致
假设程序 由 两部分组成, 依赖 ,顺序执行情况下,先执行 然后执行 ,输出结果为 ,耗时:。
进一步研究发现, 可以分成 两部分,且 依赖于另一个任务 ,也就是执行完之后,需要等待 也执行完才能继续往下走,令等待 完成的时间为 ,那么就有 ,这是顺序执行情况下的耗时。
再进一步研究发现, 仅仅依赖于 ,为了提高效率,我们可以这样做,执行完 之后,为了避免CPU等待空闲,直接调度任务 ,等任务 完成之后,假设任务 也完成了,那么切换到任务 执行完 部分。
这是一个简单的并发case,可以看到
- 程序的执行顺序变成部分有序,从 变成
- 任务 和 的前置依赖均被满足,保证了最终输出结果依然是 。
为什么要并发?
那为什么要费这么多事来实现并发呢,老老实实顺序执行不好吗?换句话说,通过并发我们获得了什么。
效率是关键。在上面的例子中,采用并发的执行方式, 被节省下来。
并发带来的另一个明显好处是多任务,就算是在一个单核CPU(single processor)机器上,也能同时运行多个应用,这是因为多个应用可以分时复用CPU,这是多个应用之间的并发。实际上,单核CPU同一时刻只能运行一个应用(这就是为什么我把上文的“同时”二字加粗的原因),但是从用户的视角来看好像有多个CPU一样,应用之间的并发虚拟化出了多个CPU的效果。
没有并发的世界是可怕的,想一想你只有把全部的工作做完才能去玩游戏,可是工作哪有做完的时候呢?我只好在工作和游戏之间来回切换,切换是有代价的,全情投入工作之前,要先把游戏里的心思先收回来,回忆起上一段工作的内容,这跟线程切换几乎一模一样。把我类比成CPU,实际上,我从事的各种活动是在分时复用“我”这个资源的。
并发有什么问题?
一般我们遇到线程并发和协程并发的情况比较多,这里的线程和协程就是并发单位。为了具体的说明问题,我们拿线程的并发举例。
第一个关键概念是临界区(critical section)。临界区指的是一段代码,这段代码会访问共享资源,这个共享资源可能是一个简单变量,也可能是一个更加复杂的数据结构。
第二个关键概念是竞争条件(race condition)。竞争条件是指,在多线程程序中,多个线程有可能几乎同时访问临界区,并且尝试更新共享资源,这可能导致意想不到的结果。比如说,两个线程同时对共享变量x执行自增操作,结果可能是+1,也可能是+2。也就是程序的运行结果是不确定的,这样的程序叫做非确定程序(indeterminate program),这是第三个关键概念。
程序运行的非确定性,这是并发要解决的本质问题,至于并发程序难于编写、难于理解、编写不当还会出现死锁,这些都是属于技术层面的问题(所以并发的定义中没有提到效率)。
怎么实现并发?
为了保证并发程序的确定性,我们需要使用一些工具,这些工具叫同步原语(mutual exclusion primitives),具体来说有:
- 互斥变量(mutex)
- 条件变量(condition variable,也叫做monitor)
- 信号量(semaphore)
互斥变量提供一种加锁机制。在访问临界区的之前,调用互斥变量的lock函数,能够保证每次只有一个线程进入到临界区,当然,离开临界区之后做的第一件事就是调用互斥变量提供的unlock函数,释放共享资源,保证其他线程或者当前线程下一次能够再次进入临界区。多线程环境下,共享变量x的自增操作可以使用互斥变量来保证正确性。
互斥变量提供一种互斥访问的机制,条件变量提供的则是同步机制。想让任务B在任务A之后执行,只需要使用互斥变量m,在调度任务B之前调用m的wait函数,在执行任务A之后调用m的signal函数。m的作用是,不管调度顺序怎么样,在signal执行之前,wait会一直等待。
信号量最早由Dijkstra提出,目的也是为了防止竞争条件的出现,但是其原始语义与条件变量和信号量不一样,并且我们会看到,互斥变量和条件变量都是信号量的一种特殊形式。
每个信号量都有一个counter,代表当前可用资源数,信号量还提供两个操作,sem_wait:当counter-1大于0的时候返回成功,且执行counter减1,当counter-1小于0的时候阻塞;sem_post,执行counter加1操作,且如果当前有线程正在等待,随机唤醒其中一个线程。
counter值只能取0或者1的的信号量称之为布尔信号量(binary semaphore),counter初始值为1的布尔信号量功能相当于互斥变量,counter初始值为0的布尔信号量相当于条件变量。
GOLANG中的互斥变量、条件变量、信号量
下面用具体的case说明为什么binary semaphore可以实现互斥变量、条件变量的功能。
GOLANG中的互斥变量
golang中的sync.Mutex就是互斥变量,如上所述,互斥变量可以解决多线程共享变量自增的正确性。
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m) // 這裡一定要用 address
}
w.Wait()
fmt.Println("final value of x", x)
}
互斥变量作用等同于容量为1的信号量,所以上面的case可以改写成:
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m chan int) {
m <- 1 // 信号量代替mutex
x = x + 1
<- m
wg.Done()
}
func main() {
var w sync.WaitGroup
m := make(chan int, 1)
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, m) // 這裡一定要用 address
}
w.Wait()
fmt.Println("final value of x", x)
}
GOLANG中的条件变量
GOLANG中的条件变量就是unbuffered channel。实际上,channel就是golang中的信号量实现,buffered channel的capacity就是信号量中的counter,不防统一称之为容量。
事实上,unbuffered channel的capacity等于0,前面说过,容量为0的信号量作用等同于条件变量。
下面的case我想在程序退出(也就是main goroutine结束)之前在屏幕上输出hello world,为了实现这点,我使用了done这个类型为chan bool的channel变量。
package main
import (
"fmt"
"time"
)
func hello(done chan bool) {
fmt.Println("hello world")
time.Sleep(4 * time.Second)
done <- true
}
func main() {
done := make(chan bool) // done的作用等同于条件变量
fmt.Println("Main going to call hello go goroutine")
go hello(done)
<- done // 管道读操作一直block,直到 hello goroutine执行并往管道中写数据,注释掉此行,main goroutine会一直执行到结束,hello goroutine不会被调度
fmt.Println("Main received data")
}
C语言中的互斥变量、条件变量、信号量
C语言中同步原语的实现体现在pthread
(POSIX Threads,POSIX是个标准,pthread是按照POSIX关于线程的标准实现的线程库)这个库中。
- pthread_mutex_t:互斥变量类型,加锁:pthread_mutex_lock,释放锁:pthread_mutex_unlock
- pthread_cond_t:条件变量类型,等待:pthread_cond_wait,唤醒:pthread_cond_signal
- sem_t:信号量类型,wait:sem_wait,post:sem_post
另外,pthread
还提供pthread_join
函数,其语义与GOLANG中的waitgroup一致。
GOLANG中的SELECT语义pthread
库没有直接提供,但是POSIX标准里面定义了select和pselect这两个功能差不多的函数来实现这个语义,linux中这两个函数都是作为系统调用实现,不同的是select和pselect监听的都是文件描述符(poll epoll select的区别与联系)。
生产者消费者问题
下面用C和GOLANG两种语言实现多producer,多consumer的生产者消费者队列。
- C语言版
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <semaphore.h>
#define MAX 4
int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;
int loops = 100;
void put(int value)
{
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}
int get()
{
int tmp = buffer[use];
use = (use + 1) % MAX;
count--;
return tmp;
}
sem_t empty;
sem_t full;
sem_t mutex;
void *producer(void *arg)
{
int i;
for(i = 0; i < loops; i++)
{
sem_wait(&empty);
sem_wait(&mutex);
put(i);
sem_post(&mutex);
sem_post(&full);
}
return 0;
}
void *consumer(void *arg)
{
int i, tmp = 0;
for(i = 0; i < loops; i++)
{
sem_wait(&full);
sem_wait(&mutex);
tmp = get();
sem_post(&mutex);
sem_post(&empty);
printf("current number : %d\n", tmp);
}
return 0;
}
int main()
{
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
pthread_t p, c, p1, p2, c1, c2;
pthread_create(&p, NULL, producer, NULL);
pthread_create(&p1, NULL, producer, NULL);
pthread_create(&p2, NULL, producer, NULL);
pthread_create(&c, NULL, consumer, NULL);
pthread_create(&c1, NULL, consumer, NULL);
pthread_create(&c2, NULL, consumer, NULL);
pthread_join(p, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_join(c, NULL);
pthread_join(c1, NULL);
pthread_join(c2, NULL);
return 0;
}
- GOLANG版
package main
import (
"fmt"
)
var MSG_BUFFER = 4
var COSUMER_CNT = 3
var NUM_CNT = 100
var msgs = make(chan int, MSG_BUFFER)
// 多个消费者,用buffered channel控制消费者全部执行完之后推出main goroutine
var done = make(chan int, COSUMER_CNT)
func produce() {
for i := 0; i < NUM_CNT; i++ {
msgs <- i
}
}
func consume() {
for i := 0; i < NUM_CNT; i++ {
msg := <-msgs
fmt.Println(msg)
}
done <- 1
}
func main () {
for i:= 0; i < COSUMER_CNT; i++ {
go produce()
go consume()
}
for i:= 0; i < COSUMER_CNT; i++ {
<- done
}
}
总结以及下篇展望
互斥变量实际上是一个锁,条件变量和信号量都是基于锁实现的,有必要说说锁的原理,下篇内容包括:
- cpu层面的原子操作 TAS case(汇编代码)
- 几种锁的实现方式(硬件中断锁、自旋锁、队列锁)
- 评估不同锁实现方式的关键:效率和公平
- 几种编程语言中的锁:GOLANG、JAVA
- 数据库中的锁
参考
- 本文大部分知识通过阅读Operating Systems: Three Easy Pieces这本操作系统教材习得,这是我读过的最好的关于操作系统的教材。本书最大的特点是将操作系统仅仅分成三个部分来讲述:虚拟化、并行、持久化,虽然只有三个部分,操作系统的核心部分却都覆盖到了。本书作者的语言功底很强,读起来朗朗上口。另外,本书的脉络清楚,整本书都在回答一个问题:操作系统作为第一个也是最重要的一个软件,是如何使得计算机系统易于用户使用的。