MapReduce-分组取TopN

1,421 阅读3分钟

MR是大数据技术中的基本功。MR源码分析的目的是清楚一个MR Job其中有哪些细节。通过一个分组取TopN的例子来串一遍所有细节,并且培养大数据计算编程手感

任务描述

如下数据,求出每个月温度最高的两天。注意:数据中有垃圾数据(重复日期的数据)

年月日 温度
2019-06-01 32
2019-06-02 36
2019-06-03 33
2019-05-01 32
2019-05-02 34
2019-05-03 34
2019-04-02 29
2019-04-02 24
2019-04-22 31
2019-04-22 30

解决思路

1.直接按年月为key,日期和温度作为value

  • 这样实现起来很简单,map只用把年月弄成key,剩下的交给reduce来做。
  • 由于我们没有干涉任务的任何配置项。分区是hash,自然相同年月的会在一起。排序比较器和分组比较器都是根据key的逻辑来的。
  • 在reduce中拿到的values是日期和温度。需要进一步设计逻辑来求解。

这种解法没有利用MR排序的特性。

2.充分利用MR排序的特性

  • 将年月日温度作为key
  • 切记mr非常灵活。年月日作为key可以再自定义排序比较器
  • 自定义排序比较器,排序逻辑为年月温度,温度倒序
  • 如果不自定义分组比较器,这样到reduce的数据不成组,由此需要自定义分组比较器,逻辑为:相同的年月就是一组。
  • 这样进入reduce的数据 已经是相同年月,并且温度倒序的
  • 最后在reduce只需要做一件事儿,就是看迭代出的第二笔数据和第一笔日期是否相同,相同则是垃圾数据。

自定义key

思考:这里设置的排序逻辑和自定义排序比较器有什么关系呢?

public class MyKey implements WritableComparable<MyKey> {
    private int year;
    private int month;
    private int day;
    private int wd;
    // 此处需要实现序列化相关代码
    // 需要实现排序逻辑
}

自定义排序比较器

思考:和上面key的排序有什么关系

/**
 * 比较逻辑,年月温度,且温度 倒序
 */
@Override
public int compare(WritableComparable a, WritableComparable b) {
    MyKey i = (MyKey) a;
    MyKey j = (MyKey) b;
    int c1 = Integer.compare(i.getYear(), j.getYear());
    if (c1 == 0) {
        int c2 = Integer.compare(i.getMonth(), j.getMonth());
        if (c2 == 0) {
            // 这是倒序
            return Integer.compare(j.getWd(), i.getWd());
        }
        return c2;
    }
    return c1;
}

自定义分区器

/**
 * 分区逻辑,以年月来作为标准
 *
 * @author huoguangyao
 */
public class MyPartitioner extends Partitioner<MyKey, IntWritable> {
    @Override
    public int getPartition(MyKey myKey, IntWritable intWritable, int numPartitions) {
        return Objects.hashCode(myKey.getYear() + myKey.getMonth());
    }
}

自定义分组比较器

相同年月的到一个分组。这样相同年月的数据才能到达一个reduce中。

public class MyGroupingComparator extends WritableComparator {
    MyGroupingComparator() {
        super(MyKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        MyKey i = (MyKey) a;
        MyKey j = (MyKey) b;
        int c1 = Integer.compare(i.getYear(), j.getYear());
        if (c1 == 0) {
            return Integer.compare(i.getMonth(), j.getMonth());
        }
        return c1;
    }
}

总结

需要对mr的每一步都非常了解。才能知道为什么要这么设计。kylin也用到了mr来做计算,而且这种编程思想贯穿整个大数据计算。