用go实现"古老的"mapreduce

1,382 阅读7分钟

最近抽空开始做6.824 Schedule: Spring 2018的相关练习,一个国外的分布式系统的相关课题任务,然后重新看了一下古老的 "mapreduce"相关论文论文 mapreduce Simplified Data Processing on Large Clusters,结合之前一些hadoop的相关知识,这里使用Go实现了mapreduce中的关键部分。我想简单的介绍一下mapreduce,然后解析实现过程,同时结合自己的想法,对mapreduce分布式离线计算框架有一个简单的整理,思考。

简要介绍

mapreduce最开始是2004(所有说"古老")年被Google提出的,大多数人应该是在hadoop广泛普及才开始接触。

简单来说mapreduce就是离线处理大规模数据的分布式计算框架,用户只需要定义一个处理k,v的map函数(该map函数会自动将计算结果保存到临时文件中)和reduce(处理合并临时文件中所有key中记录)函数即可。所以从用户角度来讲是非常简单的,使用类mapreduce框架就可以拥有处理海量数据的计算能力,他不需要去关心数据分区分片,任务的多机合理调度,任务失败重试,计算集群资源管理等等分布式系统中会遇到的问题。

这里先简单梳理一下mapreduce的逻辑。


上图可以简单看一下官方的流程图,有几个流程,从左到右的逻辑是,首先是将数据文件切分,然后分布到每个worker执行(相当于分布式执行map函数),再然后产生中间结果集文件,最后reduce函数聚合所有中间文件的结果集,然后其中需要Master节点去做居中调度,分配map Worker和reduce Worker。

下面简要介绍map, reduce, schedule的实现流程


编程模型

对于开发者的编程模型是 map和reduce,处理一组k,v数据,输出一组k,v数据, 类mapreduce框架会提供map和reduce函数接口。

关键实现逻辑

具体代码提交放在链接中github-mapreduce

Master 任务分配

master将任务调度到合适的worker,这里其实是整个计算系统的核心模块,相当于是整个系统的大脑,在这里其实需要做很多事,有很多细节,也有很多选择,比如首先非常重要的实现一个master-worker的服务发现过程,让worker注册到master, 能让master调度worker, 然后是需要实现worker-master之间的通信协议,可以通过使用dubbo, grpc, thrift,再然后worker的任务调度,这里可以增加一个像hadoop yarn, apache mesos的这样的资源管理器去合理管理调度资源,将核实的任务放在合适的计算资源上,然后像worker做心跳保活,处理worker的异常调度,最后非常重要的是做master的高可用,这里无论是在google还是hadoop感觉都不是做得特别好,主要原因应该一方面master源数据没有自动存储,然后也没有像master controller这样的管理工具去管理master容错的软件吧。

这里没有那么复杂,所有的worker和master都在一个节点,通过socket进行通信,调用,单进程多协程,相当于是伪分布式的场景,然后这里只实现了任务的调度,将每个doMap, doReduce 任务调度到合适的worker(这里的worker已经注册到了registerChan中),然后这里会有个局部变量allTask知道需要处理的总任务数,然后利用sync.WaitGroup控制携程函数的执行结束,然后将allTask中的任务通过使用Go func启动单独的协程去执行,当出现任务调度失败,将task重新放入到allTask,最后当等待所有的worker func执行完毕后,退出函数。

func schedule(
            jobName string, 
            mapFiles []string, 
            nReduce int, 
            phase jobPhase, 
            registerChan chan string) {

    var ntasks int
    var n_other int 

    //判断map reduce函数类型

    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }


    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    fmt.Printf("Schedule: %v done\n", phase)

    var wg sync.WaitGroup
    var allTask = make(chan int)

     // 加载task, 等待所有task完成
    go func() {

        for i := 0; i < ntasks; i++ {
            wg.Add(1)
            allTask <- i
        }

        wg.Wait()
        close(allTask)

    }()

     // 调度所有的task

     for i := range allTask {

        // 获取合适的worker
        worker := <- registerChan

        go func(worker string, i int) {

            file := ""

            if phase == mapPhase {

                file = mapFiles[i]
            }
            doTaskArgs := DoTaskArgs{

                JobName: jobName,
                File :  file,
                Phase : phase,
                TaskNumber:  i,
                NumOtherPhase: n_other,
            }

            // 执行worker
            if call(worker, "Worker.DoTask", &doTaskArgs, nil ) {
                wg.Done()

                // 执行成功重新放回worker

                registerChan <- worker

            } else {

                    // 放回执行失败的task

                allTask <- i
            }

        }(worker, i)

    }


    fmt.Printf("Schedule: %v done\n", phase)

}

Domap实现:

doMap实现逻辑其实比较简单,就是使用ioutil.ReadFile将就本worker的数据读出来(会有一个文件切片功能,这里没有介绍,就是将需要计算的数据通过分片的方式传递给worker, 在hadoop和gfs中默认是64M,这和分布式文件系统hdfs, gfs的具体内部实现逻辑有关系),然后执行用户定义的map func函数,将执行后的结果保存到中间文件中tmpFiles中, 这里保存的格式是json, 注意这里的环境假设是,所有worker都一个文件系统中,实际处理情况会在gfs,hdfs中。

func doMap(jobName string,  
    mapTask int, 
    inFile string,
    nReduce int, 
    mapF func(filename string, contents string) []KeyValue,
)

    // 1 读取合适的文件,执行用户定义的map函数

    body, err := ioutil.ReadFile(inFile)
    if err != nil {
        fmt.Errorf("doMap read file err %s", err)
        return
    }

    // 2 执行用户定义的map函数 

    resultKv := mapF(inFile ,string(body))

    if len(resultKv) == 0 {
        fmt.Println("doMap not need to create tmp file")
        return
    }
    tmpFiles := make([]*os.File, nReduce)
    tmpFileEcoder := make([]*json.Encoder, nReduce)

    // 3 预生成需要的中间结果文件,获取文件指针

    for i:= 0; i < nReduce; i++ {
        reduceTmpFileName := reduceName(jobName, mapTask, i)
        fmt.Println(reduceTmpFileName)
        stat, err := os.Stat(reduceTmpFileName)
        if err != nil {
            if os.IsExist(err) {
                os.Remove(reduceTmpFileName)
            }
        }
        if stat != nil {
            if stat.IsDir() {
                os.RemoveAll(reduceTmpFileName)
            }
        }
        reduceFile, err := os.Create(reduceTmpFileName)
        if err != nil {
            fmt.Errorf("doMap create tmp %s file err %s ", reduceTmpFileName, err)
            return
        }
        //tmpFiles := append(tmpFiles, reduceFile)
        tmpFiles[i] = reduceFile
        enc := json.NewEncoder(reduceFile)
        //tmpFileEcoder = append(tmpFileEcoder, enc)
        tmpFileEcoder[i] = enc

    }


    // 4 将map函数执行的resultKv,放入到中间文件中

    for _, kv := range resultKv {


        // 通过hash定位到 k 和对应的v 保存的合适的数据文件中

        hashIndex := ihash(kv.Key) % nReduce
        if tmpFileEcoder[hashIndex] == nil {
            fmt.Errorf("doMap tmpFile ecoder index err")
            continue
        }

        err := tmpFileEcoder[hashIndex].Encode(&kv)
        if err != nil {
            fmt.Errorf("doMap write tmp file %s err %s", tmpFiles[hashIndex].Name(), err)
        }
    }

    // 5 关闭文件描述符

    for _, tmpFile := range tmpFiles {
        tmpFile.Close()
    }
}

DoReduce实现:

doReduce逻辑也不是太难,就是将当所有map执行结束后,执行reduce函数(这里会用map函数所生成的中间文件),相当于是合并排序结果集,然后输出到输出文件。

func doReduce(
    jobName string, 
    MapReduce job,
    reduceTask int, 
    outFile string, 
    nMap int, 
    reduceF func(key string, values []string) string,
) {


    resultMap := make(map[string][]string)

    var keys []string

    // 1 读取所有的中间文件

    for i:= 0; i < nMap; i++{

        reduceTmpFileName := reduceName(jobName, i, reduceTask)
        reduceTmpfile, err := os.Open(reduceTmpFileName)
        if err != nil {
            fmt.Errorf("doReduce read tmp file error %s", reduceTmpFileName)
            continue
        }

        var kv KeyValue

        // 2 解析每个临时文件
        decode := json.NewDecoder(reduceTmpfile)
        err = decode.Decode(&kv)

        for err == nil {
            if _, ok := resultMap[kv.Key]; !ok {
                keys = append(keys, kv.Key)
            }
            resultMap[kv.Key] = append(resultMap[kv.Key], kv.Value)
            err = decode.Decode(&kv)
        }

        // 3 排序 key

        sort.Strings(keys)

        out, err := os.Create(outFile)
        if err != nil {
            fmt.Errorf("doReduce create output file %s failed", err)
            return
        }
        enc := json.NewEncoder(out)

        // 4 输出所有的结果到reduce文件中

        for _, key := range keys {
            if err = enc.Encode(KeyValue{key, reduceF(key, resultMap[key])}); err != nil {
                fmt.Errorf("write key: %s to file %s failed", key, outFile)
            }
        }
        out.Close()
    }


}

想法:

感觉其实几年前就进入云计算大数据的时代,然后进入移动互谅网,然后最近两年新冒出来的,物联网,新零售的概念,这意味着终端设备越来越多,同时也会产生越来越多的数据,我们在数据中找到有价值的信息会越来越难,所以感觉数据处理,数据分析的比重会是在互联网中会越来越高。

那么对应到技术上,可能以前最开始想到数据处理框架就是mapreduce, 现在可能很多人会觉得mapreduce这种框架略显老式, 我其实觉得还是使用场景的问题,像带有离线计算,批处理,离线存储这样需求,hadoop体系还是可以继续胜任,然后对于实时性要求更高的应用场景,会采用流式计算技术,像storm, yahoo s4等方案,现在开源社区也对于数据处理已经也孵化出了更多程序的框架,spark, flink的程序体系等,对于我们来讲在技术选项上也能有更多方案。


最后:

其实只实现了mapreduce比较简单的功能,有一些逻辑没有做介绍,具体可以看看上边连接中github的代码,也可以尝试做做6.824 Schedule: Spring 2018相关练习,会对理解分布式系统有更多的帮助, 有了一个基础的框架结构,可以往上边增加很多功能,实现一个标准的mapreduce库吧。