阅读 1019

带你入坑大数据(三) --- MapReduce介绍

前言


在上一篇文章中我们已经了解了HDFS的读写流程,HA高可用,联邦和Sequence Files方案,简单回顾一下HDFS的写流程

Client调用Distributed FileSystem的create方法,这个过程是远程调用了NameNode的create方法,此时NameNode就会做四件事情

  1. 检查自己是否正常运行

  2. 判断要写进HDFS的文件是否存在

  3. 检查client是否具有创建权限

  4. 对此次操作进行日志记录(edits log)

此时create方法会返回一个OutputStream,这个流还需啊哟和NameNode进行交互,调用NameNode的addBlock()方法,以得知这个block需要写在哪些数据节点上。

开始写数据时先写在一个chuck上,附带着一个4字节的checkSum,总共516字节,然后再把这些chuck写在一个更大的结构package中,在package被多个chuck写满之后,把package放到一个叫做data queue的队列中,之后所做的事情有两个

  1. data queue中的package往数据节点DataNode上传输,传输的顺序按照NameNode的addBlock()方法返回的列表依次传输

  2. 往DataNode上传输的同时也往确认队列ack queue上传输

  3. 针对DataNode中传输完成的数据做一个checkSum,并与原本打包前的checkSum做一个比较

  4. 校验成功,就从确认队列ack queue中删除该package,否则该package重新置入data queue重传

完成后通过心跳机制NameNode就可以得知副本已经创建完成,再调用addBlock()方法写之后的文件。

异常的情况就不再重新说明了,可以直接跳到第二篇进行查看

一、MapReduce编程模型


MapReduce是采用一种分而治之思想设计出来的分布式计算框架

在计算复杂或者计算量大的任务,单台服务器无法胜任时,可将其切分成一个个小的任务,小任务分别在不同的服务器上并行执行,最终再汇总每个小任务的结果即可

MapReduce由两个阶段组成,切分成小任务的Map阶段和汇总小任务的Reduce阶段,如下图,需要注意,三个小任务是可以并行执行的

1.1 Map阶段

map()函数的输入时键值对,输出的是一系列键值对,输出的结果时写入本地磁盘的

1.2 Reduce阶段

reduce()函数的输入时键值对(即map()函数的输出),输出是一系列键值对,最终写入HDFS

大体逻辑在下面的图非常清晰明了了,shuffle的过程之后再说明

二、MapReduce编程示例


永远都逃不过的词频统计,统计一篇文章中,各个单词出现的次数

2.1 原理图分析

从左到右,有一个文件,HDFS对它进行了分块存储,且每一个块我们也可以视为是一个分片(split),然后它提供一个kv对(0,Dear Bear River)过来,key为什么是0呢?那这里的0其实是偏移量,这个偏移量是会随着文件中的数据字节大小进行变化的。在当前例子中暂时我们还用不上,我们需要做的只是把作为value的Dear Bear River做一个拆分,然后进行统计,统计完成后开始读第二行的Dear Car,同样输出即可。

之后这个文件分成的3个块都统计好之后,再按照同一个单词汇聚到同一个节点进行统计的方式,得出结果即可

需要注意的问题

1.我们可以看到在上图存在着 4 个单词 4 个 reduce task,但是这个reduce task的个数是由开发人员自己决定的,只是一个SetReduceNum(4)的问题

2.为什么reduce可以得知究竟有多少个单词,提到shuffle时我们再说。

3.细心的你应该会发现shufflling过后的那些(Dear,1)有4个,可是key不应该只能存在一个么,这也是shuffle的时候要说的

2.2 mapper代码

public class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        for (String word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
复制代码

这里的LongWritable对应Java里面的Long类型,Text对应String类型,因为分布式框架中数据从一个节点到另一个节点时会存在序列化和反序列化的问题,所以Hadoop自身提供了一些带有序列化功能的类供我们使用,也就是平时我们看到的键值对是(Long,String),在这里就变成了(LongWritable,Text)而已。

之后就是覆写map()方法,实现单词分割,之后把每个单词作为key,以(word,1)这种状态输出出去。

想要查看这些API方法的话,可以去hadoop官网查看,这里我用的还是2.7.3,看过上一篇的同学应该也是知道了

这里有两个Mapper是因为第一个Mapper是老的Mapper,现在已经使用新的了。点击Method之后就可以看到刚刚使用的map()方法了

2.3 Reducer代码

public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /*
        key: hello
        value: List(1, 1, ...)
    */
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        int sum = 0;

        for (IntWritable count : values) {
            sum = sum + count.get();
        }
        context.write(key, new IntWritable(sum));// 输出最终结果
    };
}
复制代码

有了上一个2.2的基础,这个代码就不再展开说明了,就是把value进行累加,然后得出一个sum,key还是指单词,之后以(word,sum)这种状态输出出去。

补充:当value中的列表非常大时,会选择提高集群内存或者设置一些读句子时候的限制(自定义InputFormat类,MapReduce默认的是TextInputFormat)把数据大小给减少。

2.4 程序执行的main()方法

这里的main方法基本每一个都是直接拷贝过来然后填填set方法的参数直接用的

public class WordMain {
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        if (args.length != 2 || args == null) {
            System.out.println("please input Path!");
            System.exit(0);
        }

        Configuration configuration = new Configuration();
        // 生成一个job实例
        Job job = Job.getInstance(configuration, WordMain.class.getSimpleName());

        // 打jar包之后,找程序入口用
        job.setJarByClass(WordMain.class);

        // 通过job设置输入/输出格式
        // MR的默认输入格式就是TextInputFormat,所以注释掉也没问题
        //job.setInputFormatClass(TextInputFormat.class);
        //job.setOutputFormatClass(TextOutputFormat.class);

        // 设置输入/输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置处理Map/Reduce阶段的类
        job.setMapperClass(WordMap.class);
        job.setReducerClass(WordReduce.class);
        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
        //job.setMapOutputKeyClass(.class)
        // 设置最终输出key/value的类型m
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交作业
        job.waitForCompletion(true);
    }
}
复制代码

运行的方式可以本地运行,可以集群运行,可以maven打包运行也可以,运行结果可以通过yarn查看,因为考虑到大家可能没时间去搭建一个集群玩这里就不贴图了,后面找机会分享一下简答的3节点的集群搭建。

2.5 combiner

map端的本地聚合,无论运行多少次combiner操作,都不会影响最终的结果

注意:不是所有MapReduce程序都适合使用,比如求average

WordCountMap与WordCountReduce代码不变
WordCountMain中,增加job.setCombinerClass(WordCountReduce.class);
复制代码

键值对一开始的时候是第一张图的样子,现在我们刚经过Mapping时会存在大量的键值对,它们会通过网络传到对应的Reducing那,如果都是按照(word,1)的格式传输过去,传输的数据量就变得非常巨大,所以这时候最好的方案是先在本地对某一个单词先做一个汇总,也就是combine操作,如图,两个(dear,1)变成了一个(Dear,2),2个(Car,1)变成了(Car,2)等···

2.6 shuffle过程

map task 输出的时候会输出到一个环形缓冲区中,每一个环形缓冲区是100M大小,随着数据的不断读写,让环形缓冲区的内存达到80%,这时候会造成溢出写磁盘,把这些文件写到磁盘中,而这个写到磁盘的操作会经历3个过程

首先是分区,默认情况下是利用key来进行分区操作,MapReduce框架专门提供了一个HashPartitioner用于进行分区操作

环形缓冲区的kv对在落入磁盘前都需要去调用一下getPartition()方法,此时我们可以看到,它使用了一个比较巧妙的方法:先是计算了一下这个key的hashcode,再模上一个reduce的个数,这种时候我们看上面的图,reduce的个数是4,那我们一个数字去模4,结果只会是4个,也就是0,1,2,3,所以这四个结果就会对应不同的缓冲区

剩下的就是reduce task来进行拉取数据,刚开始时会放到内存当中,放不下的时候也会溢出写到磁盘

当然如果一开始的时候有进行setCombine操作的话就会变成(Dear,4),在图中因为我们是举例说明,实际情况下每个分区都有很多不同的单词,在reduce操作时就会进行合并操作,即相同的key放在一起,然后按照字母顺序排序。

combine,merge,和最后的reduce task,这些功能都一样,只不过作用的阶段不同,方便提升性能。只要达到业务要求就行,有时候一个map就能解决需求,有时候需要map和reduce两个阶段。

之后每一个reduce task的结果都会写到HDFS的一个文件里。当map task完成后,后面说yarn的时候会有一个appMaster,做一个轮询的确认,确认完成后再通知reduce task从本地磁盘拉取,有比较多的具体知识需要后续跟进时才会在最后形成一个比较清晰的概念,这也是非常正常的。

2.7 二次排序

MapReduce中根据key进行分区排序和分组,如果现在需要自定义key类型,并自定义key的排序规则,如何实现(结合代码讲解)

public class Person implements WritableComparable<Person> {
    private String name;
    private int age;
    private int salary;

    public Person() {
    }

    public Person(String name, int age, int salary) {
        //super();
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getSalary() {
        return salary;
    }

    public void setSalary(int salary) {
        this.salary = salary;
    }

    @Override
    public String toString() {
        return this.salary + "  " + this.age + "    " + this.name;
    }

    //先比较salary,高的排序在前;若相同,age小的在前
    public int compareTo(Person o) {
        int compareResult1= this.salary - o.salary;
        if(compareResult1 != 0) {
            return -compareResult1;
        } else {
            return this.age - o.age;
        }
    }

    //序列化,将NewKey转化成使用流传送的二进制
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(name);
        dataOutput.writeInt(age);
        dataOutput.writeInt(salary);
    }

    //使用in读字段的顺序,要与write方法中写的顺序保持一致
    public void readFields(DataInput dataInput) throws IOException {
        //read string
        this.name = dataInput.readUTF();
        this.age = dataInput.readInt();
        this.salary = dataInput.readInt();
    }
}
复制代码

讲解内容··

2.8 数据倾斜

数据倾斜是数据中的常见情况。数据中不可避免地会出现离群值(outlier),并导致数据倾斜。这些离群值会显著地拖慢MapReduce的执行。常见的数据倾斜有以下几类:

  1. 数据频率倾斜——某一个区域的数据量要远远大于其他区域。(reduce倾斜)
  2. 数据大小倾斜——部分记录的大小远远大于平均值。(map倾斜)

在map端和reduce端都有可能发生数据倾斜。在map端的数据倾斜会让多样化的数据集的处理效率更低。在reduce端的数据倾斜常常来源于MapReduce的默认分区器。

数据倾斜会导致map和reduce的任务执行时间大为延长,也会让需要缓存数据集的操作消耗更多的内存资源。

2.8.1 如何诊断是否存在数据倾斜

  1. 关注由map的输出数据中的数据频率倾斜的问题。
  2. 如何诊断map输出中哪些键存在数据倾斜?
    • 在reduce方法中加入记录map输出键的详细情况的功能

    • 在发现了倾斜数据的存在之后,就很有必要诊断造成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每个键的最大值。为了减少追踪量,可以设置数据量阀值,只追踪那些数据量大于阀值的键,并输出到日志中。

8.2 减缓Reduce端数据倾斜

  1. Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况,也就是部分输出键的数据量远远大于其它的输出键

  2. 如何减小reduce端数据倾斜的性能损失?

① 抽样和范围分区
Hadoop默认的分区器是基于map输出键的哈希值分区。这仅在数据分布比较均匀时比较好。在有数据倾斜时就很有问题。

使用分区器需要首先了解数据的特性。**TotalOrderPartitioner**中,可以通过对原始数据进行抽样得到的结果集来预设分区边界值。TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。
复制代码
② 自定义分区
另一个抽样和范围分区的替代方案是基于输出键的背景知识进行自定义分区。例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。
复制代码
③ Combine
使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,combine的目的就是聚合并精简数据。在技术48种介绍了combine。
复制代码
④ Map端连接和半连接
如果连接的数据集太大而不能在map端的连接中使用。那么可以考虑第4章和第7章中介绍的超大数据集的连接优化方案。
复制代码
⑤ 数据大小倾斜的自定义策略
在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响,乃至导致OutOfMemoryError异常。处理这种情况并不容易。可以参考以下方法。

- 设置mapred.linerecordreader.maxlength来限制RecordReader读取的最大长度。RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。
- 通过org.apache.hadoop.contrib.utils.join设置缓存的数据集的记录数上限。在reduce中默认的缓存记录数上限是100条。
- 考虑使用有损数据结构压缩数据,如Bloom过滤器。
复制代码

finally

MR的没有分篇,篇幅很大,希望大家能够耐心看完。

根据顺序下一篇是Yarn,走完大数据的这个流程。