MIT6.824 Lab1 预热

1,140 阅读3分钟

假设有个字符串:

var str = "The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines."

需要统计里面每个字母出现的次数。最直观简单的做法就是利用一个 map,从开始到末尾读这个字符串,并把字母作为 key,出现的次数作为 value。Map 中包含 key 的时候,value + 1,Map 中没有 key 的时候默认 1。最后读完这个字符串就 OK。

var m = make(map[string]int)
temp := strings.Split(str, "")

for _, c := range temp {
    if !unicode.IsLetter([]rune(c)[0]) {
        continue
    }
    if count, ok := m[c]; ok {
        m[c] = count + 1
    } else {
        m[c] = 1
    }
}
[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]

在现实世界中,这个 str 可能非常巨大,所以有时候我们需要将源文本拆分成多个小的字符串,然后多个线程同时处理,每个线程计算得到当前的中间结果,最后合并到一起。

上述的过程在函数式编程中可以被抽象为 Map 和 Reduce 两个函数。其中 Map 函数是把一个数组的每个元素按照相同的逻辑处理之后返回的结果,Reduce 函数是把所有元素整合起来得到结果。通常这个两个函数的参数都是函数,Map 的返回值一般也是数组,Reduce 的返回值可能是各种类型。

为了在单机上实现出并发处理的效果,可以用 Go 自带的 goroutine 来实现。下面把拆分的工作省略,直接进入主题

接下来用 4 个 goroutine 同时处理这些 string,每个做 goroutine 利用 单机串行版 的逻辑,生产出一个小规模的中间内容。随后把每个中间内容都整合起来得到最终值。接下来需要考虑

  • Go 天生支持 CSP 编程模型,所以利用 channel 做通信没有问题
  • 是否有 data race
package main

import (
	"strings"
	"sync"
	"unicode"
)

type ResultMap struct {
	sync.Mutex
	result map[string]int
}

func main()  {
	str1 := "The MapReduce library in the user program first"
	str2 := "splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB)"
	str3 := "per piece (controllable by the user via an optional parameter)."
	str4 := "It then starts up many copies of the program on a cluster of machines."

	strs := []string {str1, str2, str3, str4}

	// 主线程需要阻塞直到所有的 reduce 都结束
	var waitGroup sync.WaitGroup
	waitGroup.Add(len(strs))

	c := make(chan map[string]int)

	res := new(ResultMap)
	res.result = make(map[string]int)

	for _, str := range strs {
		go doMap(str, c)
		go doReduce(c, res, &waitGroup)
	}

	waitGroup.Wait()

	sortPrintMap(res.result)

}

// 生产出对应的 kv 传递给 channel
func doMap(str string, c chan map[string]int) {
	temp := strings.Split(str, "")
	m := make(map[string]int)

	for _, c := range temp {
		if !unicode.IsLetter([]rune(c)[0]) {
			continue
		}
		if count, ok := m[c]; ok {
			m[c] = count + 1
		} else {
			m[c] = 1
		}
	}
	c <- m
}

// 合并
func doReduce(c chan map[string]int, res *ResultMap, group *sync.WaitGroup) {
	res.Lock()
	defer res.Unlock()
	for k, v := range <- c {
		if count, ok := res.result[k]; ok {
			res.result[k] = count + v
		} else {
			res.result[k] = v
		}
	}
	group.Done()
}

检查一下结果 (Map 的 key 本身是无序的,这里是排好序之后的)

[M:3 R:1 y:7 o:13 v:1 e:26 h:7 l:10 i:14 r:15 T:1 p:13 d:1 u:6 c:8 b:5 s:14 g:4 a:17 f:5 m:7 t:20 B:1 I:1 n:10]

结果无误之后,这个问题可以再深入

  • 上述的 reduce 和 map 是单机上的,之间的数据共享用了 channel,如果是物理隔离的场景下,如何用别的东西做数据共享?
  • 任何一个子任务都有可能因为各种原因挂掉,如何在某个子任务挂掉的情况下,系统的准确性不受影响,甚至能自愈?
  • 上述的 goroutine 在执行结束之后就会被调度器回收,但实际上因为 map 总是会比 reduce 先结束,那么后期的过程实际上可以有更多的 goroutine 可以参与到 reduce 任务中 r 如何实现这种调度让资源可以被更加充分的利用?