MR是大数据技术中的基本功。MR源码分析的目的是清楚一个MR Job
其中有哪些细节。通过一个分组取TopN的例子来串一遍所有细节,并且培养大数据计算编程手感
任务描述
如下数据,求出每个月温度最高的两天。注意:数据中有垃圾数据(重复日期的数据)
年月日 温度
2019-06-01 32
2019-06-02 36
2019-06-03 33
2019-05-01 32
2019-05-02 34
2019-05-03 34
2019-04-02 29
2019-04-02 24
2019-04-22 31
2019-04-22 30
解决思路
1.直接按年月为key,日期和温度作为value
- 这样实现起来很简单,map只用把年月弄成key,剩下的交给reduce来做。
- 由于我们没有干涉任务的任何配置项。分区是
hash
,自然相同年月的会在一起。排序比较器和分组比较器都是根据key
的逻辑来的。 - 在reduce中拿到的
values
是日期和温度。需要进一步设计逻辑来求解。
这种解法没有利用MR排序的特性。
2.充分利用MR排序的特性
- 将年月日温度作为key
- 切记mr非常灵活。
年月日作为key
可以再自定义排序比较器
- 自定义排序比较器,排序逻辑为
年月温度,温度倒序
- 如果不自定义
分组比较器
,这样到reduce的数据不成组,由此需要自定义分组比较器
,逻辑为:相同的年月就是一组。 - 这样进入reduce的数据 已经是相同年月,并且温度倒序的。
- 最后在reduce只需要做一件事儿,就是看迭代出的第二笔数据和第一笔日期是否相同,相同则是垃圾数据。
自定义key
思考:这里设置的排序逻辑和自定义排序比较器有什么关系呢?
public class MyKey implements WritableComparable<MyKey> {
private int year;
private int month;
private int day;
private int wd;
// 此处需要实现序列化相关代码
// 需要实现排序逻辑
}
自定义排序比较器
思考:和上面key的排序有什么关系
/**
* 比较逻辑,年月温度,且温度 倒序
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey i = (MyKey) a;
MyKey j = (MyKey) b;
int c1 = Integer.compare(i.getYear(), j.getYear());
if (c1 == 0) {
int c2 = Integer.compare(i.getMonth(), j.getMonth());
if (c2 == 0) {
// 这是倒序
return Integer.compare(j.getWd(), i.getWd());
}
return c2;
}
return c1;
}
自定义分区器
/**
* 分区逻辑,以年月来作为标准
*
* @author huoguangyao
*/
public class MyPartitioner extends Partitioner<MyKey, IntWritable> {
@Override
public int getPartition(MyKey myKey, IntWritable intWritable, int numPartitions) {
return Objects.hashCode(myKey.getYear() + myKey.getMonth());
}
}
自定义分组比较器
相同年月的到一个分组。这样相同年月的数据才能到达一个reduce中。
public class MyGroupingComparator extends WritableComparator {
MyGroupingComparator() {
super(MyKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey i = (MyKey) a;
MyKey j = (MyKey) b;
int c1 = Integer.compare(i.getYear(), j.getYear());
if (c1 == 0) {
return Integer.compare(i.getMonth(), j.getMonth());
}
return c1;
}
}
总结
需要对mr的每一步都非常了解。才能知道为什么要这么设计。kylin
也用到了mr
来做计算,而且这种编程思想贯穿整个大数据计算。