每日一题:手写wordcount

18 阅读3分钟

wordcount是计算一个文章中单词出现的次数,也就是统计词频

这个问题的考察点有两个,第一是对wordcount这个流程的理解,先将单词拆分开,然后统计每一个单词出现的次数。第二点是考察算子使用的熟练度,和对算子理解的程度。

先贴最常见的实现方式:

def main(args: Array[String]): Unit = {
  //1.创建spark环境
  val conf=new SparkConf().setMaster("local").setAppName("wordcount")
  val sparkContext=new SparkContext(conf)
  //2.读取数据
  val textFile: RDD[String] = sparkContext.textFile("data/word.txt")
  //3.将每个词拆分开
  val words: RDD[String] = textFile.flatMap {
    line => {
      line.split(" ")
    }
  }
  wc6(words)
}
def wc1(words: RDD[String])={
  //4.给每个词创建元组,并且表示一下这个词出现1次
  val wordM: RDD[(String, Int)] = words.map {
    word => {
      (word, 1)
    }
  }
  //5.对次进行汇总聚合
  //reduceByKey算子的作用是根据元组的第一个字段进行聚合,并且对第二个字段进行累加
  //reduceByKey内部有预聚合,相比groupByKey性能有所提升
  val rz = wordM.reduceByKey(_ + _)
  //end.打印数据
  rz.foreach(println)
}

下面是不同的算子实现wordcount,有些算子还没有完善,算是复习了一遍相关算子的实现方式 使用的算子包含:textFile,map,flatmap,reduceByKey,groupBy,groupByKey,mapValues,countByKey,countByValue,reduce,aggregate等

object Spark_wc {

  def main(args: Array[String]): Unit = {
    //1.创建spark环境
    val conf=new SparkConf().setMaster("local").setAppName("wordcount")
    val sparkContext=new SparkContext(conf)
    //2.读取数据
    val textFile: RDD[String] = sparkContext.textFile("data/word.txt")
    //3.将每个词拆分开
    val words: RDD[String] = textFile.flatMap {
      line => {
        line.split(" ")
      }
    }
    wc6(words)
  }
  //第一种,最普通的实现wc的方式,reduceByKey(延伸出aggregateByKey,foldByKey,combineByKey)
  def wc1(words: RDD[String])={
    //4.给每个词创建元组,并且表示一下这个词出现1次
    val wordM: RDD[(String, Int)] = words.map {
      word => {
        (word, 1)
      }
    }
    //5.对次进行汇总聚合
    //reduceByKey算子的作用是根据元组的第一个字段进行聚合,并且对第二个字段进行累加
    //reduceByKey内部有预聚合,相比groupByKey性能有所提升
    val rz = wordM.reduceByKey(_ + _)
    //end.打印数据
    rz.foreach(println)
  }
  //第二种,groupBy+map
  def wc2(words: RDD[String])={
    //4.对数据进行转换
    val wordM: RDD[(String, Int)] = words.map {
      word => {
        (word, 1)
      }
    }
    //5.对元组数据根据第一个值进行分组
    val wordG = wordM.groupBy(_._1)
    //6.使用map算子,对每一个数据进行计算数据量的计算
    val rz = wordG.map {
      case (w, i) => { //模式匹配
        (w, i.size)
      }
    }
    //end.打印数据
    rz.foreach(println)
  }
  //第三种、groupByKey+mapValues
  def wc3(words: RDD[String])={
    //4.对数据进行转换
    val wordM: RDD[(String, Int)] = words.map {
      word => {
        (word, 1)
      }
    }
    //5.和groupBy的区别在于,groupByKey自动对将元组的第一个元素当作key
    val wordG = wordM.groupByKey()
    //6.使用mapValues算子,只对元组的第二个数据进行转换并汇总聚合
    val rz = wordG.mapValues(_.size)
    //end.打印数据
    rz.foreach(println)
  }
  //第四种、countByKey 存在shuffle,计算key出现的次数
  def wc4(words: RDD[String])={
    //4.对数据进行转换
    val wordM: RDD[(String, Int)] = words.map {
      word => {
        (word, 1)
      }
    }
    //5.计算key出现的次数
    val wordG = wordM.countByKey()
    //end.打印数据
    wordG.foreach(println)
  }
  //第五种、countByValue
  def wc5(words: RDD[String])={
    //4.使用countByValue算子直接计算每个单词出现的次数
    val wordG = words.countByValue()
    //end.打印数据
    wordG.foreach(println)
  }
  //第六种、reduce 实现起来有点复杂
  def wc6(words: RDD[String])={
    val strMap: RDD[mutable.Map[String, Int]] = words.map(s => {
      mutable.Map[String, Int]((s, 1))
    })
    strMap.collect().foreach(println)
    val rz: mutable.Map[String, Int] = strMap.reduce(
      (a, b) => {//a的第一个元素为原数据的第一个元素,后边的a为每次计算的结果数据,b为原数据从第二个数据开始的遍历数据
        b.foreach { //为什么对b进行foreach,因为b是一个map类型的数据,所以需要进行循环便利,虽然只有一个元素
          case (word, count) => {
            //找出含有的元素,并对数值进行相加
            val newCount: Int = a.getOrElse(word, 0) + count
            println(s"-----$a,$b--${a.getOrElse(word, 0)}---$word,$count")
            a.update(word, newCount)
            println(s"********$a" )
          }
        }
        a
      }
    )
    println(rz)
  }
  //第七种、aggregate 类似于reduce,只是区分分区内计算和分区间计算。这里分区内计算和分区间计算方式相同
  def wc7(words: RDD[String])={
    //4.对数据进行转换
    val wordM: RDD[(String, Int)] = words.map {
      word => {
        (word, 1)
      }
    }
    //5.aggregate,对前后两个数据进行计算
    val wordG: (String, Int) = wordM.aggregate {
      (a, b) => {
        ?
      }
    }
    //end.打印数据
    println(wordG)
  }