并发 - Go 语言学习笔记

1,219 阅读9分钟

概述

并发 (concurrency) 是指同时管理很多事情,这些事情可能只做了一半就被暂停去做别的事情了。

Go 语言里的并发指的是能让某个函数独立于其它函数运行的能力。当一个函数创建为 goroutine 时,Go 会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。

Go 语言只需通过 go 关键字来开启 goroutine 即可实现并发。

Goroutine

每一个并发的执行单元称为一个goroutine,goroutine 可以看作是轻量级线程,因为它比线程更小,十几个 goroutine 可能体现在底层就是五六个线程。

普通函数创建 goroutine

语法格式:

go 函数名( 参数列表 )
  • 函数名:要调用的函数名。
  • 参数列表:调用函数需要传入的参数。

Go 允许使用 go 语句开启一个新的运行期线程, 即 goroutine,以一个不同的、新创建的 goroutine 来执行一个函数。 同一个程序中的所有 goroutine 共享同一个地址空间。
例如:

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}

执行以上代码,你会看到输出的 hello 和 world 是没有固定先后顺序。因为它们是两个 goroutine 在执行:

world
hello
world
hello
world
hello
world
hello
world
hello

匿名函数创建 goroutine

语法格式:

go func( 参数列表 ) {
    函数体
}( 调用参数列表 )
  • 参数列表:函数体内的参数变量列表。
  • 函数体:匿名函数的代码。
  • 调用参数列表:启动 goroutine 时,需要向匿名函数传递的调用参数。

并发的使用

1. 可以同时处理多个客户端请求的网络时钟

服务端程序 NetClock.go:

package main

import (
	"io"
	"log"
	"net"
	"time"
)

/*
实战案例:可同时处理多个客户端请求的网络时钟

TCP 编写基于TCP的服务端和客户端
*/

func main()  {
	listener, err := net.Listen("tcp", "localhost: 8000")
	if err != nil {
		log.Fatal(err)
		return
	}
	for {
		// 等待客户端请求
		conn, err := listener.Accept()
		if err != nil {
			log.Println(err)
			continue
		}

		// 向客户端发送服务端的时间
		go handleConn(conn)
	}
}

func handleConn(c net.Conn) {
	defer c.Close()
	for {
		// 将服务端的时间发送给客户端
		_, err := io.WriteString(c,time.Now().Format("15:04:05\n"))
		if err != nil {
			return
		}
		time.Sleep(1 * time.Second)
	}
}

客户端程序 NetClockClient.go:

package main

import (
	"io"
	"log"
	"net"
	"os"
)

// 网络时钟客户端
func main()  {
	conn, err := net.Dial("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer conn.Close()
	copy(os.Stdout, conn)

}

func copy(dst io.Writer, src io.Reader)  {
	if _,err := io.Copy(dst,src);err != nil {
		log.Fatal(err)
	}
}

执行结果:

17:32:25
17:32:26
17:32:27
17:32:28
17:32:29
...

2. 同时响应多长请求的Echo服务器

服务端程序 MultiEcho.go

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"strings"
	"time"
)

func main()  {
	listener, err := net.Listen("tcp", "localhost:8888")
	if err != nil {
		log.Fatal(err)
		return
	}
	for {
		// 监听客户端请求
		conn, err := listener.Accept()
		if err != nil {
			log.Print(err)
			continue
		}
		go handleConn(conn)
	}
}


func handleConn(c net.Conn) {
	input := bufio.NewScanner(c)
	for input.Scan() {
		// 并发处理同一个客户端多次请求
		go echo(c, input.Text(), 2*time.Second)
	}
	c.Close()
}

func echo(c net.Conn, shout string, delay time.Duration)  {
	// 将客户端发过来的数据转换成大写
	fmt.Fprintln(c, "\t", strings.ToUpper(shout))
	time.Sleep(delay)
	fmt.Fprintf(c, "\t", shout)
	time.Sleep(delay)
	fmt.Fprintln(c, "\t", strings.ToLower(shout))
}

3. 竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race condition)。竞争状态的存在是让并发程序变得复杂的地方,十分容易引起潜在问题。对一个共享资源的读和写操作必须是原子化的,换句话说,同一时刻只能有一个 goroutine 对共享资源进行读和写操作。

以下是包含竞争状态的示例程序:

// 这个示例程序展示如何在程序里造成竞争状态
// 实际上不希望出现这种情况
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	// counter 是所有 goroutine 都要增加其值的变量
	counter int

	// wg 用来等待程序结束
	wg sync.WaitGroup
)

// main 是所有 Go 程序的入口
func main() {
	// 计数加2,表示要等待两个 goroutine
	wg.Add(2)

	// 创建两个 goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待 goroutine 结束
	wg.Wait()
	fmt.Println("Final counter:", counter)
}

// incCounter 增加包里 counter 变量的值
func incCounter(id int) {
	// 在函数退出时调用 Done 来通知 main 函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 捕获 counter 的值
		value := counter

		// 当前 goroutine 从线程退出,并放回到队列
		runtime.Gosched()

		// 增加本地 value 变量的值
		value ++

		// 将该值保存回 counter
		counter = value
	}
}

尝试多次执行以上示例,输出结果会随机输出:

Final Counter: 2
// 或
Final Counter: 4

变量 counter 会进行4次读和写操作,每个 goroutine 执行两次。但是,程序终止时,counter 变量的值却为2。这是因为每个 goroutine 都会覆盖另一个 goroutine的工作。这种覆盖发生在 goroutine 切换的时候。每个 goroutine 创造了一个 counter 变量的副本,之后就切换到另一个 goroutine。当这个goroutine 再次运行的时候,counter 变量的值已经改变了,但是 goroutine 并没有更新自己的那个副本的值,而是继续使用这个副本的值,用这个值递增,并存回 counter 变量,结果覆盖了另一个 gouroutine 完成的工作。

一种修正代码,消除竞争状态的办法是,使用 Go 语言提供的锁机制,来锁住共享资源,从而保证 goroutine 的同步状态。

4. 锁住共享资源

Go 语言提供了传统的同步 goroutine 的机制,就是对共享资源加锁。如果需要顺序访问一个整型变量或者一段代码,atomic 和 sync 包里的函数提供了很好的解决方案。

原子函数能够以很底层的枷锁机制来同步访问整型变量和指针。

package main
// 这个示例程序展示如何在程序里造成竞争状态
// 实际上不希望出现这种情况
import (
	"fmt"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	// counter 是所有 goroutine 都要增加其值的变量
	counter int64

	// wg 用来等待程序结束
	wg sync.WaitGroup
)

// main 是所有 Go 程序的入口
func main() {
	// 计数加2,表示要等待两个 goroutine
	wg.Add(2)

	// 创建两个 goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待 goroutine 结束
	wg.Wait()
	fmt.Println("Final counter:", counter)
}

// incCounter 增加包里 counter 变量的值
func incCounter(id int) {
	// 在函数退出时调用 Done 来通知 main 函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 安全地对 counter 加1
		atomic.AddInt64(&counter, 1)

		// 当前 goroutine 从线程退出,并放回到队列
		runtime.Gosched()
	}
}

输出:

Final Counter: 4

现在得到了正确的值4。
以上程序使用了 atmoic 包的 AddInt64 函数,这个函数会同步整型值的加法,方法是强制同一时刻只能有一个 goroutine 运行并完成这个加法操作。当 goroutine 试图去调用任何原子函数时,这些 goroutine 都会自动根据所引用的变量做同步处理。

另外两个有用的原子函数是 LoadInt64AddInt64函数。这两个函数提供了一种安全地读和写一个整型值的方式。

// 这个示例程序展示如何使用 atomic 包里的 Store 和 Load 类函数来提供对数值类型的安全访问

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	// shutdown 是通知正在执行的 goroutine 停止工作的标志
	shutdown int64

	// wg 用来等待程序结束
	wg sync.WaitGroup
)

// main 是所有 Go 程序的入口
func main()  {
	// 计数加2,表示要等待两个 goroutine
	wg.Add(2)

	//创建两个 goroutine
	go doWork("A")
	go doWork("B")

	// 给定 goroutine 执行的时间
	time.Sleep(1 * time.Second)

	// 该停止工作了, 安全地设置 shutdown 标志
	fmt.Println("Shutdown now")
	atomic.StoreInt64(&shutdown, 1)

	// 等待 goroutine 结束
	wg.Wait()
}

// doWork 用来模拟执行工作的 goroutine
// 检测之前的 shutdown 标志来决定是否提前终止
func doWork(name string)  {
	// 在函数退出是调用 Done 来通知 main 函数工作已经完成
	defer wg.Done()

	for {
		fmt.Printf("Doing %s Work \n", name)
		time.Sleep(250 * time.Millisecond)

		// 要停止工作了吗?
		if atomic.LoadInt64(&shutdown) == 1 {
			fmt.Printf("Shutting %s Down\n", name)
			break
		}
	}
}

输出:

Doing A Work 
Doing B Work 
Doing B Work 
Doing A Work 
Doing B Work 
Doing A Work 
Doing B Work 
Doing A Work 
Shutdown now
Shutting B Down
Shutting A Down

以上程序启动了两个goroutine, 在各自循环的每次迭代之后,会使用 LoadInt64 来检查 shutdown 变量的值,这个函数会安全地返回 shutdown 变量的一个副本,如果这个副本的值为1,goroutine 就会跳出循环并终止。

main 函数使用 StoreInt64 函数来安全地修改 shutdown 变量的值。如果哪个 doWork goroutine 试图在 main 函数调用 StoreInt64 的同时调用 LoadInt64函数,那么原子函数会将这些调用互相同步,保证这些操作都是安全的,不会进入竞争状态。

5. 互斥锁

另一种同步访问共享资源的方式是使用互斥锁(mutex)。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界区代码。

// 这个示例程序展示如何使用互斥锁来定义一段需要同步访问的代码临界区资源的同步访问
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	// counter 是所有 goroutine 都要增加其值的变量
	counter int

	// wg 用来等待程序结束
	wg sync.WaitGroup

	// mutex 用来定义一段代码临界区
	mutex sync.Mutex
)

// main 是所有 Go 程序的入口
func main() {
	// 计数加2,表示要等待两个 goroutine
	wg.Add(2)

	// 创建两个 goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待 goroutine 结束
	wg.Wait()
	fmt.Printf("Final counter: %d\n", counter)
}

// incCounter 使用互斥锁来同步并保证安全访问
// 增加包里 counter 变量的值
func incCounter(id int) {
	// 在函数退出时调用 Done 来通知 main 函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 同一时刻只允许一个 goroutine 进入这个临界区
		mutex.Lock()
		{
			// 捕获 counter 的值
			value := counter

			// 当前 goroutine 从线程退出,并放回到队列
			runtime.Gosched()

			// 增加本地 value 变量的值
			value ++

			// 将该值保存回 counter
			counter = value
		}
		mutex.Unlock()
		// 释放锁,允许其它正在等待的 goroutine 进入临界区
	}
}

输出:

Final counter: 4

在对 counter 变量操作的前后,分别用 Lock() 和 Unlock() 函数调用定义的临界区里将其保护起来,同一时刻只有一个 goroutine 可以进入临界区,之后,直到调用 Unlock() 函数之后,其它 goroutine 才能进入临界区。