wordcount是计算一个文章中单词出现的次数,也就是统计词频
这个问题的考察点有两个,第一是对wordcount这个流程的理解,先将单词拆分开,然后统计每一个单词出现的次数。第二点是考察算子使用的熟练度,和对算子理解的程度。
先贴最常见的实现方式:
def main(args: Array[String]): Unit = {
//1.创建spark环境
val conf=new SparkConf().setMaster("local").setAppName("wordcount")
val sparkContext=new SparkContext(conf)
//2.读取数据
val textFile: RDD[String] = sparkContext.textFile("data/word.txt")
//3.将每个词拆分开
val words: RDD[String] = textFile.flatMap {
line => {
line.split(" ")
}
}
wc6(words)
}
def wc1(words: RDD[String])={
//4.给每个词创建元组,并且表示一下这个词出现1次
val wordM: RDD[(String, Int)] = words.map {
word => {
(word, 1)
}
}
//5.对次进行汇总聚合
//reduceByKey算子的作用是根据元组的第一个字段进行聚合,并且对第二个字段进行累加
//reduceByKey内部有预聚合,相比groupByKey性能有所提升
val rz = wordM.reduceByKey(_ + _)
//end.打印数据
rz.foreach(println)
}
下面是不同的算子实现wordcount,有些算子还没有完善,算是复习了一遍相关算子的实现方式 使用的算子包含:textFile,map,flatmap,reduceByKey,groupBy,groupByKey,mapValues,countByKey,countByValue,reduce,aggregate等
object Spark_wc {
def main(args: Array[String]): Unit = {
//1.创建spark环境
val conf=new SparkConf().setMaster("local").setAppName("wordcount")
val sparkContext=new SparkContext(conf)
//2.读取数据
val textFile: RDD[String] = sparkContext.textFile("data/word.txt")
//3.将每个词拆分开
val words: RDD[String] = textFile.flatMap {
line => {
line.split(" ")
}
}
wc6(words)
}
//第一种,最普通的实现wc的方式,reduceByKey(延伸出aggregateByKey,foldByKey,combineByKey)
def wc1(words: RDD[String])={
//4.给每个词创建元组,并且表示一下这个词出现1次
val wordM: RDD[(String, Int)] = words.map {
word => {
(word, 1)
}
}
//5.对次进行汇总聚合
//reduceByKey算子的作用是根据元组的第一个字段进行聚合,并且对第二个字段进行累加
//reduceByKey内部有预聚合,相比groupByKey性能有所提升
val rz = wordM.reduceByKey(_ + _)
//end.打印数据
rz.foreach(println)
}
//第二种,groupBy+map
def wc2(words: RDD[String])={
//4.对数据进行转换
val wordM: RDD[(String, Int)] = words.map {
word => {
(word, 1)
}
}
//5.对元组数据根据第一个值进行分组
val wordG = wordM.groupBy(_._1)
//6.使用map算子,对每一个数据进行计算数据量的计算
val rz = wordG.map {
case (w, i) => { //模式匹配
(w, i.size)
}
}
//end.打印数据
rz.foreach(println)
}
//第三种、groupByKey+mapValues
def wc3(words: RDD[String])={
//4.对数据进行转换
val wordM: RDD[(String, Int)] = words.map {
word => {
(word, 1)
}
}
//5.和groupBy的区别在于,groupByKey自动对将元组的第一个元素当作key
val wordG = wordM.groupByKey()
//6.使用mapValues算子,只对元组的第二个数据进行转换并汇总聚合
val rz = wordG.mapValues(_.size)
//end.打印数据
rz.foreach(println)
}
//第四种、countByKey 存在shuffle,计算key出现的次数
def wc4(words: RDD[String])={
//4.对数据进行转换
val wordM: RDD[(String, Int)] = words.map {
word => {
(word, 1)
}
}
//5.计算key出现的次数
val wordG = wordM.countByKey()
//end.打印数据
wordG.foreach(println)
}
//第五种、countByValue
def wc5(words: RDD[String])={
//4.使用countByValue算子直接计算每个单词出现的次数
val wordG = words.countByValue()
//end.打印数据
wordG.foreach(println)
}
//第六种、reduce 实现起来有点复杂
def wc6(words: RDD[String])={
val strMap: RDD[mutable.Map[String, Int]] = words.map(s => {
mutable.Map[String, Int]((s, 1))
})
strMap.collect().foreach(println)
val rz: mutable.Map[String, Int] = strMap.reduce(
(a, b) => {//a的第一个元素为原数据的第一个元素,后边的a为每次计算的结果数据,b为原数据从第二个数据开始的遍历数据
b.foreach { //为什么对b进行foreach,因为b是一个map类型的数据,所以需要进行循环便利,虽然只有一个元素
case (word, count) => {
//找出含有的元素,并对数值进行相加
val newCount: Int = a.getOrElse(word, 0) + count
println(s"-----$a,$b--${a.getOrElse(word, 0)}---$word,$count")
a.update(word, newCount)
println(s"********$a" )
}
}
a
}
)
println(rz)
}
//第七种、aggregate 类似于reduce,只是区分分区内计算和分区间计算。这里分区内计算和分区间计算方式相同
def wc7(words: RDD[String])={
//4.对数据进行转换
val wordM: RDD[(String, Int)] = words.map {
word => {
(word, 1)
}
}
//5.aggregate,对前后两个数据进行计算
val wordG: (String, Int) = wordM.aggregate {
(a, b) => {
?
}
}
//end.打印数据
println(wordG)
}