go并发研究

247 阅读8分钟

基础概述

go语言的并发同步模型来自叫做通信顺序进程的范型(Communicating Sequential Processes,CSP),通过goroutine 之间传递数据而不是对数据加锁进行同步访问

Do not communicate by sharing memory; instead, share memory by communicating.

普通的线程并发模型,就是像Java、C++、或者Python,他们线程间通信都是通过共享内存的方式来进行的。非常典型的方式就是,在访问共享数据(例如数组、Map、或者某个结构体或对象)的时候,通过锁来访问,因此,在很多时候,衍生出一种方便操作的数据结构,叫做“线程安全的数据结构”。例如Java提供的包"java.util.concurrent"中的数据结构。Go中也实现了传统的线程并发模型。

进程和线程

运行一个程序的时候,就会开启一个进程,这个进程就相当于包括了这个程序所有资源集合的容器

进程资源包括但不限于以下3块内容 内存,句柄,线程

具体概览如下

线程调度

操作系统调度器会决定哪个线程会获得CPU的运行,调度CPU时间片的分配 单核CPU系统下,超线程运行的话,CPU同一时间只会进行一个线程的运行,所以多线程会导致CPU不停地进行切换,多核CPU就可以意味着并行

如果程序是CPU密集型,则并发并不会提高性能,甚至因为多次创建线程导致效率更低,但是如果是IO密集型,则并发操作可以让线程再等待io的时候执行其他逻辑来提高程序效率

线程调度的几个方式

  1. 抢占式调度

可以设置线程的优先级,优先级高的可以先占用CPU

  1. 分时调度

各个线程轮流进行CPU的使用权,且平均分配CPU的时间片

goroutine特点

--- example 01 ---
func main() {
	runtime.GOMAXPROCS(1)

	var wg sync.WaitGroup
	wg.Add(2)

	go func() {
		defer wg.Done()
		for count := 0; count < 3; count++ {
			for char := 'a'; char < 'a'+26; char++ {
				fmt.Printf("%c", char)
			}
		}
	}()
	go func() {
		defer wg.Done()
		for count := 0; count < 3; count++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c", char)
			}
		}
	}()

	wg.Wait()
}

goroutine 执行顺序
会是:ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz

解析: 多个goroutine 相当于一个个待执行的G任务,调度器在处理G任务的时候往往会将最后的G 提到next待处理的任务中,所以最后的G任务会最先进行,其他的就按照顺序进行,但是会存在P空闲的时候

基于调度器的内部算法,一个正在运行的goroutine在工作结束前,可以被停止并重新调度,调度器会停止当前正运行的goroutine,并给其他可运行的goroutine运行的机会

如下

func main() {
	runtime.GOMAXPROCS(1)
	wg.Add(2)
	go printPrime("A")
	go printPrime("B")
	wg.Wait()
}

func printPrime(prefix string) {
	defer wg.Done()

next:
	for outer := 2; outer < 5000; outer++ {
		for inner := 2; inner < outer; inner++ {
			if outer%inner == 0 {
				continue next
			}
		}
		fmt.Printf("%s :%d\n", prefix, outer)
	}
}

会发现 AB 两个goroutine 会交替计算

线程实现模型

M   代表内核线程
P   代表一个执行GO代码片段的必须资源(上下文环境)
G   代表一个Go代码片段
M
一个M代表一个内核线程,创建一个M 都是因为没有足够的M 来关联P并运行其中可运行的G
在垃圾回收的时候也会导致M的创建,
创建后,系统会对他进行一番初始化,初始化自身的栈空间以及信号处理,在起始函数执行完毕后,当前M会与之预联的P完成关联。
单个M的最大数量是10000 可以通过修改setmaxThreads 参数来调整最大数量
P
通过runtime.GOMAXPROCS 可以设定P的最大数量,目前上限值是256,设定这个值之后,系统会重整全局P列表,该列表包含了当前运行时系统创建的所有P,与M类似,系统会有一个空闲的P列表,当P不与任何一个M关联是时,就会将其放进空闲P列表中(当然这时候的G队列必须为空)
P除了一个可运行的G列表之外,还有一个自由的G列表,这个列表存储的是已完成的G任务,为了提高复用性,启用1个G的时候,会先去自由G列表中获取一个现成的G,只有自由列表中获取不到的时候,才会重新申请一个新的G
G
GO编译器会将go语句变成对内部函数newproc的调用,并将参数传递给这个函数,与P M 相同,运行时也持有一个G的全局列表,初始化流程包括关联go函数以及设置G的状态跟id
会立即放入P的runnext 字段中, 这个字段存放优先级最高的G,如果runnext已经有了值,则
会放入起可运行队列中
核心元素的容器
包括全局,空闲M,P,G列表
实现和操纵Go的线程实现模型的内部程序笼统称为 运行时系统,可以称为调度器,一个Go程序只会存在一个调度器实例

调度器

两级线程模型一部分调度惹怒会有操作系统内核之外的程序承担,这就是调度器

主goroutine的运作

源码目录:src/runtime/proc.go

  1. 设定每一个goroutine 的栈空间最大尺寸,32位位250MB 64位为1GB
  2. 主goroutine 会在当前M的g0上执行系统检测任务
  3. 锁住主线程
  4. 检查当前M是否为runtime.m0,不是的话抛出异常
  5. 开启gc goroutine (gcenable)
  6. 执行main包的init函数
  7. 解锁主线程
  8. 执行main函数
  9. 如果有竞争检测的话,则初始化竞争检测
  10. 当其他协程有panicdefer 的话,重试多次确保panicdefer为0是才退出,此时需要执行go park函数,gopark函数用于协程切换

竞争状态

如果两个或者多个goroutine访问某个共享的资源,并试图同时读写该资源,这种情况就成为竞争状态,我们队一个共享资源的读写必须是原子化的,也就是同一时刻只能有一个goroutine对共享资源进行读与写操作

func main() {
	wg1.Add(2)
	go initCounter(1)
	go initCounter(2)

	wg1.Wait()
	fmt.Println("final counter:", counter)
}

func initCounter(id int) {
	defer wg1.Done()
	for count := 0; count < 2; count++ {
		value := counter
		runtime.Gosched()
		value++

		counter = value

		fmt.Println("get id is", id)
	}
}

会发现 最终counter 的值在2-4之间来回返回,就是因为2个goroutine对同一个变量进行赋值,每个goroutine 都会覆盖另一个goroutine 的工作,细节如下:

goroutine A 对 counter 变量赋值完,将其保存在自己的副本中,另一个goroutine 执行时,不会对刚刚A改变后的counter 值继续操作,而是重新开始执行,这样就会导致 goroutineB 覆盖了A的操作,导致最终的counter值不确定

runtime.Gosched()函数是用来强制将goroutine 冲当前线程中退出,给其他goroutine处理

我们可以使用竞争检测来发现代码中是否存在这样的问题

go build -race list09.go
./list09

如图可以看出,代码中存在竞争问题

如何锁住共享资源

处理方式:对共享资源加锁,go包中 atomic以及sync包都提供了很多这样的解决方案

atomic:

原子操作即执行过程中不能被中断的操作,原子操作仅会由一个独立的CPU指令代表和完成,
只有这样才能在并发环境保证原子操作的绝对安全

mutex 是由操作系统决定的,而atomic是由底层硬件决定的,CPU指令集中,有一些指令是封装进atomic的,
这些指令在执行过程中是不能被中断的所以能保证并发安全

mutex:

互斥锁在代码中创建一个临界区,保证同一时间只有一个goroutine 可以执行这个临界区代码
func main() {
	wg2.Add(2)
	go incCounter(1)
	go incCounter(2)
	wg2.Wait()
	fmt.Printf("final counter %d\n", counter1)
}

func incCounter(id int) {
	defer wg2.Done()

	for count := 0; count < 2; count++ {
		mutex.Lock()
		value := counter1
		runtime.Gosched()
		value++
		counter1 = value

		mutex.Unlock()
	}
	fmt.Println("id is ", id)
}
使用竞争检测后 ,也没有问题

通道

当一个资源在goroutine中共享时,通道在goroutine 之间架起管道,提供确保同步交换数据的机制, 申明时,需要指定将要被共享的数据类型

无缓冲

func main() {
	//申明无缓冲通道
	court := make(chan int)

	wg3.Add(2)

	go player("james", court)
	go player("lina", court)

	court <- 1

	wg3.Wait()
}

func player(playerName string, court chan int) {
	defer wg3.Done()
	for {
		ball, ok := <-court
		if !ok {
			fmt.Printf("Player %s win\n", playerName)
			return
		}
		n := rand.Intn(100)
		if n%12 == 0 {
			fmt.Printf("Player %s miss\n", playerName)
			close(court)
			return
		}

		fmt.Printf("Player %s Hit %d\n", playerName, ball)
		ball++
		court <- ball
	}
}

有缓冲

当通道关闭后,goroutine 依旧可以从通道中接收数据,但不能从通道中发送数据,从一个已关闭并且没有数据的通道
里获取数据,总会立即返回,一个通道类型的零值