基础概述
go语言的并发同步模型来自叫做通信顺序进程的范型(Communicating Sequential Processes,CSP),通过goroutine 之间传递数据而不是对数据加锁进行同步访问
Do not communicate by sharing memory; instead, share memory by communicating.
普通的线程并发模型,就是像Java、C++、或者Python,他们线程间通信都是通过共享内存的方式来进行的。非常典型的方式就是,在访问共享数据(例如数组、Map、或者某个结构体或对象)的时候,通过锁来访问,因此,在很多时候,衍生出一种方便操作的数据结构,叫做“线程安全的数据结构”。例如Java提供的包"java.util.concurrent"中的数据结构。Go中也实现了传统的线程并发模型。
进程和线程
运行一个程序的时候,就会开启一个进程,这个进程就相当于包括了这个程序所有资源集合的容器
进程资源包括但不限于以下3块内容 内存,句柄,线程
具体概览如下
线程调度
操作系统调度器会决定哪个线程会获得CPU的运行,调度CPU时间片的分配 单核CPU系统下,超线程运行的话,CPU同一时间只会进行一个线程的运行,所以多线程会导致CPU不停地进行切换,多核CPU就可以意味着并行
如果程序是CPU密集型,则并发并不会提高性能,甚至因为多次创建线程导致效率更低,但是如果是IO密集型,则并发操作可以让线程再等待io的时候执行其他逻辑来提高程序效率
线程调度的几个方式
- 抢占式调度
可以设置线程的优先级,优先级高的可以先占用CPU
- 分时调度
各个线程轮流进行CPU的使用权,且平均分配CPU的时间片
goroutine特点
--- example 01 ---
func main() {
runtime.GOMAXPROCS(1)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c", char)
}
}
}()
go func() {
defer wg.Done()
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c", char)
}
}
}()
wg.Wait()
}
goroutine 执行顺序
会是:ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
解析: 多个goroutine 相当于一个个待执行的G任务,调度器在处理G任务的时候往往会将最后的G 提到next待处理的任务中,所以最后的G任务会最先进行,其他的就按照顺序进行,但是会存在P空闲的时候
基于调度器的内部算法,一个正在运行的goroutine在工作结束前,可以被停止并重新调度,调度器会停止当前正运行的goroutine,并给其他可运行的goroutine运行的机会
如下
func main() {
runtime.GOMAXPROCS(1)
wg.Add(2)
go printPrime("A")
go printPrime("B")
wg.Wait()
}
func printPrime(prefix string) {
defer wg.Done()
next:
for outer := 2; outer < 5000; outer++ {
for inner := 2; inner < outer; inner++ {
if outer%inner == 0 {
continue next
}
}
fmt.Printf("%s :%d\n", prefix, outer)
}
}
会发现 AB 两个goroutine 会交替计算
线程实现模型
M 代表内核线程
P 代表一个执行GO代码片段的必须资源(上下文环境)
G 代表一个Go代码片段
M
一个M代表一个内核线程,创建一个M 都是因为没有足够的M 来关联P并运行其中可运行的G
在垃圾回收的时候也会导致M的创建,
创建后,系统会对他进行一番初始化,初始化自身的栈空间以及信号处理,在起始函数执行完毕后,当前M会与之预联的P完成关联。
单个M的最大数量是10000 可以通过修改setmaxThreads 参数来调整最大数量
P
通过runtime.GOMAXPROCS 可以设定P的最大数量,目前上限值是256,设定这个值之后,系统会重整全局P列表,该列表包含了当前运行时系统创建的所有P,与M类似,系统会有一个空闲的P列表,当P不与任何一个M关联是时,就会将其放进空闲P列表中(当然这时候的G队列必须为空)
P除了一个可运行的G列表之外,还有一个自由的G列表,这个列表存储的是已完成的G任务,为了提高复用性,启用1个G的时候,会先去自由G列表中获取一个现成的G,只有自由列表中获取不到的时候,才会重新申请一个新的G
G
GO编译器会将go语句变成对内部函数newproc的调用,并将参数传递给这个函数,与P M 相同,运行时也持有一个G的全局列表,初始化流程包括关联go函数以及设置G的状态跟id
会立即放入P的runnext 字段中, 这个字段存放优先级最高的G,如果runnext已经有了值,则
会放入起可运行队列中
核心元素的容器
包括全局,空闲M,P,G列表
实现和操纵Go的线程实现模型的内部程序笼统称为 运行时系统,可以称为调度器,一个Go程序只会存在一个调度器实例
调度器
两级线程模型一部分调度惹怒会有操作系统内核之外的程序承担,这就是调度器
主goroutine的运作
源码目录:src/runtime/proc.go
- 设定每一个goroutine 的栈空间最大尺寸,32位位250MB 64位为1GB
- 主goroutine 会在当前M的g0上执行系统检测任务
- 锁住主线程
- 检查当前M是否为runtime.m0,不是的话抛出异常
- 开启gc goroutine (gcenable)
- 执行main包的init函数
- 解锁主线程
- 执行main函数
- 如果有竞争检测的话,则初始化竞争检测
- 当其他协程有panicdefer 的话,重试多次确保panicdefer为0是才退出,此时需要执行go park函数,gopark函数用于协程切换
竞争状态
如果两个或者多个goroutine访问某个共享的资源,并试图同时读写该资源,这种情况就成为竞争状态,我们队一个共享资源的读写必须是原子化的,也就是同一时刻只能有一个goroutine对共享资源进行读与写操作
func main() {
wg1.Add(2)
go initCounter(1)
go initCounter(2)
wg1.Wait()
fmt.Println("final counter:", counter)
}
func initCounter(id int) {
defer wg1.Done()
for count := 0; count < 2; count++ {
value := counter
runtime.Gosched()
value++
counter = value
fmt.Println("get id is", id)
}
}
会发现 最终counter 的值在2-4之间来回返回,就是因为2个goroutine对同一个变量进行赋值,每个goroutine 都会覆盖另一个goroutine 的工作,细节如下:
goroutine A 对 counter 变量赋值完,将其保存在自己的副本中,另一个goroutine 执行时,不会对刚刚A改变后的counter 值继续操作,而是重新开始执行,这样就会导致 goroutineB 覆盖了A的操作,导致最终的counter值不确定
runtime.Gosched()函数是用来强制将goroutine 冲当前线程中退出,给其他goroutine处理
我们可以使用竞争检测来发现代码中是否存在这样的问题
go build -race list09.go
./list09
如图可以看出,代码中存在竞争问题
如何锁住共享资源
处理方式:对共享资源加锁,go包中 atomic以及sync包都提供了很多这样的解决方案
atomic:
原子操作即执行过程中不能被中断的操作,原子操作仅会由一个独立的CPU指令代表和完成,
只有这样才能在并发环境保证原子操作的绝对安全
mutex 是由操作系统决定的,而atomic是由底层硬件决定的,CPU指令集中,有一些指令是封装进atomic的,
这些指令在执行过程中是不能被中断的所以能保证并发安全
mutex:
互斥锁在代码中创建一个临界区,保证同一时间只有一个goroutine 可以执行这个临界区代码
func main() {
wg2.Add(2)
go incCounter(1)
go incCounter(2)
wg2.Wait()
fmt.Printf("final counter %d\n", counter1)
}
func incCounter(id int) {
defer wg2.Done()
for count := 0; count < 2; count++ {
mutex.Lock()
value := counter1
runtime.Gosched()
value++
counter1 = value
mutex.Unlock()
}
fmt.Println("id is ", id)
}
使用竞争检测后 ,也没有问题
通道
当一个资源在goroutine中共享时,通道在goroutine 之间架起管道,提供确保同步交换数据的机制, 申明时,需要指定将要被共享的数据类型
无缓冲
func main() {
//申明无缓冲通道
court := make(chan int)
wg3.Add(2)
go player("james", court)
go player("lina", court)
court <- 1
wg3.Wait()
}
func player(playerName string, court chan int) {
defer wg3.Done()
for {
ball, ok := <-court
if !ok {
fmt.Printf("Player %s win\n", playerName)
return
}
n := rand.Intn(100)
if n%12 == 0 {
fmt.Printf("Player %s miss\n", playerName)
close(court)
return
}
fmt.Printf("Player %s Hit %d\n", playerName, ball)
ball++
court <- ball
}
}
有缓冲
当通道关闭后,goroutine 依旧可以从通道中接收数据,但不能从通道中发送数据,从一个已关闭并且没有数据的通道
里获取数据,总会立即返回,一个通道类型的零值