GO语言基础篇(二十八)- Go并发实践之并发非阻塞缓存

494 阅读8分钟

这是我参与8月更文挑战的第 28 天,活动详情查看: 8月更文挑战

并发非阻塞缓存

本文会分享一个实现并发非阻塞的缓存示例,它可以解决在并发实战很常见但已有的库也不能很好解决的一个问题:函数记忆问题,即缓存函数的结果,达到多次调用但只需计算一次的效果。不仅是并发安全的,并且会避免简单的对整个缓存使用单个锁而带来的锁争夺问题

以下边的httpGetBody函数作为示例来演示函数记忆。它会发起一个HTTP的GET请求,并读取响应体。调用这个函数比较耗时,所以我们希望避免不必要的重复调用

func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }

    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

ReadAll返回两个结果,一个[]byte一个error,因为他们可以分别赋值给interface{}和error,所以我们可以直接返回这个结果,而不需要做额外的处理。下边是初始版本的缓存设计

package memo

import (
	"io/ioutil"
	"net/http"
)

func httpGetBody(url string) (interface{}, error) {
	resp, err := http.Get(url)
	if err != nil {
		return nil, err
	}

	defer resp.Body.Close()
	return ioutil.ReadAll(resp.Body)
}

//Memo缓存了调用Func的结果
type Memo struct {
	f Func
	cache map[string]result
}

//Func是用于记忆的函数类型
type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

//并不是并发安全的
func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}

	return res.value, res.err
}

Memo实例包含了被记忆的函数f(类型为Func),以及缓存,类型为从字符串到result的一个映射表。每个result都是调用f产生的结果对:一个值和一个错误。在设计的推进过程中会展示Memo的几种变体,但所有变体都会遵守这些基本概念

下面的例子展示如何使用Memo。对于一串请求URL中的每个元素,首先调用Get,记录延时和它返回的数据长度

m := memo.New(httpGetBody)
for url := range incomingURLs() {
	start := time.Now()
	value, err := m.Get(url)
	if err != nil {
		log.Print(err)
	}
	fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}

我们可以使用 testing包来系统地调查一下记忆的效果(测试会在后边的几篇文章中分享)。从下面的测试结果来看,可以看到URL流有重复项,尽管每个URL第一次调用(*Memo).Get都会消耗数百毫秒的时间,但对这个URL的第二次请求在1us内就返回了同样的结果

$ go test -v gopl. io/ch9/memo1
=== RUN Test
<https://golang.org>,175.026418ms,7537bytes
<https://godoc.org172.686825ms>,6878bytes
<https://play.golang.org115.762377ms>,5767bytes
<http://gopl.io>,749.887242ms,2856bytes
<https://golang.org>,721ns,7537bytes
<https://godoc.org>,152ns,6878bytes
<https://play.golang.org205ns>,5767bytes
ittp: //gopl.io, 326ns, 2856 bytes
-- PASS: Test (1. 21s)
PASS
ok gopl. io/ch9/memo1 1.257s

这次测试中所有的Get都是串行运行的。因为HTTP请求用并发来改善的空间很大,所以我们修改测试来让所有请求并发进行。这个测试使用sync.WaitGroup来做到,等最后一个请求完成后再返回的效果

m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
	n.Add(1)
	go func(url string){
		start := time.Now()
		value, err := m.Get(url)
		if err != nil {
			log.Print(err)
		}
		fmt.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
		n.Done()
	}(url)
}
n.Wait()

这次的测试运行起来快很多,但是它并不是每一次都能正常运行。我们可能能注意到意料之外的缓存无效,以及缓存命中后返回错误的结果,甚至崩溃

更糟糕的是,有的时候它能正常运行,所以我们可能甚至都没有注意到它会有问题。但如果我们加上-race标志后再运行,那么竞态检测器(在前边的文章中有分享,可以点这里),经常会输出与下面类似的一份报告:

$ go test -run=TestConcurrent -race-v gopl.io/ch9/memo1
== RUN TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36
		runtime. mapassign1o
			~/go/src/runtime/hashmap. go: 411 +0x0
		gopl. io/ch9/memo1.(*Memo). Get(
			~/gobook2/src/gopl io/ch9/ memo1/memo. go: 32+0x205
Previous write by goroutine 35:
		runtime. mapassign1()
			~/go/src/runtime/hashmap. go: 411 +0x0
		gopl. io/ch9/memo1. (*Memo ). Get()
			~/gobook2/src/gopl io/ch9/memo1/memo. go:32 +0X205
...
Found 1 data race(s)
FAIL goplioch9/memo1 2.393s

上面提到的 memo.go:32告诉我们两个 goroutine在没使用同步的情况下更新了cache map。整个Get函数其实不是并发安全的,它存在数据竞态

func (memo *Memo) Get(key string) (interface{}, error) {
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}

	return res.value, res.err
}

让缓存并发安全最简单的方法就是用一个基于监控的同步机制。我们需要的是给Memo加一个互斥量,并在Get函数的开头获取互斥锁,在返回前释放互斥锁,这样两个cache相关的操作就发生在临界区域了

type Memo struct {
	f Func
	mu sync.Mutex
	cache map[string]result
}

//Get是并发安全的
func (memo *Momo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	if !ok {
		res.value, res.err = memo.f(key)
		memo.cache[key] = res
	}
	memo.mu.Unlock()

	return res.value, res.err
}

现在即使并发运行测试,竞态检测器也没有报警。但是这次对Memo的修改让我们之前对性能的优化失效了。由于每次调用f时都上锁,因此Get把我们希望并行的I/O操作串行化了。我们需要的是一个非阻塞的缓存,一个不会把他需要记忆的函数串行运行的缓存

在下面一个版本的Get实现中,主调goroutine会分两次获取锁:第一次用于查询,第 二次用于在查询无返回结果时进行更新。在两次之间,其他goroutine也可以使用缓存

func (memo *Momo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	res, ok := memo.cache[key]
	memo.mu.Unlock()
	
	if !ok {
		res.value, res.err = memo.f(key)
		memo.mu.Lock()
		memo.cache[key] = res
		memo.mu.Unlock()
	}

	return res.value, res.err
}

性能再度得到提升,但我们注意到某些URL被获取了两次。在两个或者多个goroutine 几乎同时调用Get来获取同一个URL时就会出现这个问题。两个goroutine都首先查询缓存,发现缓存中没有需要的数据,然后调用那个慢函数f,最后又都用获得的结果来更新 map,其中一个结果会被另外一个覆盖

在理想情况下我们应该避免这种额外的处理。这个功能有时称为**重复抑制。**在下面的Memo版本中,map的每个元素是一个指向entry结构的指针。除了与之前一样包含一个已经记住的函数f调用结果之外,每个entry还新加了一个通道ready。 在设置entry的result字段后,通道会关闭,正在等待的goroutine会收到广播,然后就可以从entry读取结果了

type entry struct {
	res result
	ready chan struct{}
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f Func
	mu sync.Mutex
	cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
	memo.mu.Lock()
	e := memo.cache[key]
	if e == nil {
		// 对key第一次访问,这个goroutine负责计算数据和广播数据
		//已准备完毕的数据
		e = &entry{ready: make(chan struct{})}
		memo.cache[key] = e
		memo.mu.Unlock()

		e.res.value, e.res.err = memo.f(key)

		close(e.ready)
	} else {
		//对这个key的重复访问
		memo.mu.Unlock()
		<-e.ready
	}

	return e.res.value, e.res.err
}

现在调用Get会先获取保护cache map的互斥锁,再从map中查询一个指向已有entry的指针,如果没有找到,就分配并插入一个新的entry,最后释放锁。如果要查询的entry存在,那么它的值可能还没准备好(另外一个goroutine有可能还在调用慢函数f),所以主调goroutine就需要等待entry准备好才能读取entry中的result数据,具体的实现方法就是从ready通道读取数据,这个操作会一直阻塞到通道关闭

如果要查询的entry不存在,那么当前的goroutine就需要新插入一个没有准备好的entry到map里,并负责调用慢函数f,更新entry,最后向其他正在等待的goroutine广播数据已准备完毕的消息

注意,entry中的变量e.res.value和e.res.err被多个goroutine共享。创建entry的 goroutine设置了这两个变量的值,其他goroutine在收到数据准备完毕的广播后开始读这 两个变量。尽管变量被多个goroutine访问,但此处不需要加上互斥锁。ready通道的关闭先于其他goroutine收到广播事件,所以第一个goroutine的变量写入事件也先于后续多个 goroutine的读取事件。在这个情况下数据竞态不存在

这里的并发、重复抑制、非阻塞缓存就完成了

上面的Memo代码使用一个互斥量来保护被多个调用Get的goroutine访问的map变量。 接下来会对比另外一种设计,在新的设计中,map变量限制在一个监控goroutine中,而Get 的调用者则不得不改为发送消息

Func、result、entry的声明与前边一致。尽管Get的调用者通过这个通道来与监控goroutine通信,但是Memo类型现在包含一 个通道requests。该通道的元素类型是request。通过这种数据结构,Get的调用者向监控goroutine发送被记忆函数的参数(key),以及一个通道response,结果在准备好后就通过response通道发回。这个通道仅会传输一个值

//request是一条请求消息,key需要用Func调用
type request struct {
	key string
	response chan<- result
}

type Memo struct {
	requests chan result
}

func New(f Func) *Memo {
	memo := &Memo{requests: make(chan result)}
	go memo.serve(f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	memo.requests <- request{key, response}
	res := <-response
	return res.value, res.err
}

func (memo *Memo) Close() {
	close(memo.requests)
}

上面的Get方法创建了一个响应(response)通道,放在了请求里边,然后把它发送给监控goroutine,再马上从响应通道中读取

如下所示,cache变量被限制在监控goroutine (即(*Memo) .server)中。监控goroutine从request通道中循环读取,直到该通道被Close方法关闭。对于每个请求,它先查询缓存,如果没找到则创建并插入一个新的entry

func (memo *Memo) server(f Func)  {
	cache := make(map[string]*entry)
	for req:=range memo.requests {
		e := cache[req.key]
		if e == nil {
			//对这个key的第一次请求
			e = &entry{ready: make(chan struct{})}
			cache[req.key] = e
			go e.call(f, req.key)//调用f(key)
		}
		go e.deliver(req.response)
	}
}

func (e *entry) call(f Func, key string) {
	//执行函数
	e.res.value, e.res.err = f(key)
	close(e.ready)
}

func (e *entry) deliver(response chan<- result)  {
	//等该数据已准备完毕
	<-e.ready
	//向客户端发生结果
	response <- e.res
}

与基于互斥锁的版本类似,对于指定键的一次请求,负责在该键上调用函数f,保存结果到entry中,最后通过关闭ready通道来广播准备完毕状态。这个流程通过(*entry). call来实现

对同一个键的后续请求会在map中找到已有的entry,然后等待结果准备好,最后通过响应通道把结果发回给调用Get的客户端goroutine。其中call和deliver方法都需要在独立的goroutine中运行,以确保监控goroutine能持续处理新请求

上面的例子展示了可以使用两种方案来构建并发结构:共享变量并上锁,或者通信顺序进程,这两者也都不复杂

在给定的情况下也许很难判定哪种方案更好,但了解这两种方案的对照关系是很有价值的。有时候从一种方案切换到另外一种能够让代码更简单