Linux环境Spark安装配置及使用

25,075 阅读51分钟

Linux环境Spark安装配置及使用

1. 认识Spark

(1) Spark介绍

  • 大数据计算引擎
  • 官网:spark.apache.org/
  • 官方介绍:Apache Spark™ is a unified analytics engine for large-scale data processing.(Apache Spark™是一个用于大规模数据处理的统一分析引擎。)
  • Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
  • Spark生态圈:
    • Spark Core:RDD(弹性分布式数据集)
    • Spark SQL
    • Spark Streaming
    • Spark MLLib:协同过滤,ALS,逻辑回归等等 --> 机器学习
    • Spark Graphx:图计算

(2) 为什么要学习Spark

  • Hadoop的MapReduce计算模型存在的问题:
    • MapReduce的核心是Shuffle(洗牌)。在整个Shuffle的过程中,至少会产生6次的I/O。

    • 中间结果输出:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。另外,当一些查询(如:Hive)翻译到MapReduce任务时,往往会产生多个Stage(阶段),而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果,而I/O的效率往往较低,从而影响了MapReduce的运行速度。

  • Spark的最大特点:基于内存
  • Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,弥补MapReduce的不足。

(3) Spark的特点:快、易用、通用、兼容

  • ——与Hadoop的MapReduce相比,Spark基于内存的运算速度要快100倍以上,即使,Spark基于硬盘的运算也要快10倍。Spark实现了高效的DAG执行引擎,从而可以通过内存来高效处理数据流。
  • 易用——Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
  • 通用——Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。另外Spark还可以很好的融入Hadoop的体系结构中可以直接操作HDFS,并提供Hive on Spark、Pig on Spark的框架集成Hadoop。
  • 兼容——Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和ApacheMesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

2. Spark体系架构

  • Spark的运行方式
    • Yarn
    • Standalone:本机调试(demo)
  • Worker(从节点):每个服务器上,资源和任务的管理者,只负责管理一个节点。
  • 执行过程:
    • 一个Worker 有多个 Executor。 Executor是任务的执行者,按阶段(stage)划分任务。—> RDD
  • 客户端:Driver Program 提交任务到集群中。
    • spark-submit
    • spark-shell

3. Spark-2.1.0安装流程

(1) 准备工作

  • 具备java环境
  • 配置主机名
  • 配置免密码登录
  • 防火墙关闭

(2) 解压spark-2.1.0-bin-hadoop2.7.tgz安装包到目标目录下:

  • tar -zxvf .tar.gz -C 目标目录

(3) 为后续方便,重命名Spark文件夹:

  • mv spark-2.1.0-bin-hadoop2.7/ spark-2.1.0

(4) Spark目录介绍

  • bin —— Spark操作命令
  • conf —— 配置文件
  • data —— Spark测试文件
  • examples —— Spark示例程序
  • jars
  • LICENSE
  • licenses
  • NOTICE
  • python
  • R
  • README.md
  • RELEASE
  • sbin —— Spark集群命令
  • yarn —— Spark-yarn配置

(5) 修改配置文件:

  • <1>. 配置spark-env.sh:
    • 进入spark-2.1.0/conf路径,重命名配置文件:
      • mv spark-env.sh.template spark-env.sh
    • 修改spark-env.sh信息:
      • vi spark-env.sh
      • export JAVA_HOME=/opt/module/jdk1.8.0_144
        export SPARK_MASTER_HOST=bigdata01
        export SPARK_MASTER_PORT=7077
        
  • <2>. 配置slaves:
    • 进入spark-2.1.0/conf路径,重命名配置文件:
      • mv slaves.template slaves
    • 修改slaves信息:
      • vi slaves
      • bigdata02
        bigdata03
        

(6) 配置环境变量:

  • 修改配置文件:
    • vi /etc/profile
  • 增加以下内容:
    • export SPARK_HOME=spark安装路径
    • export PATH=$PATH:$SPARK_HOME/bin
    • export PATH=$PATH:$SPARK_HOME/sbin
  • 声明环境变量:
    • source /etc/profile

(6) 集群配置:

  • 拷贝配置好的spark到其他机器上
    • scp -r spark-2.1.0/ bigdata02:$PWD
    • scp -r spark-2.1.0/ bigdata03:$PWD

(7) 启动:

  • 启动主节点:
    • start-master.sh
  • 启动从节点:
    • start-slaves.sh
  • 启动shell:
    • spark-shell
  • 通过网页端查看:

(8) 关闭:

  • 关闭主节点:
    • stop-master.sh
  • 关闭从节点:
    • stop-slaves.sh

4. Spark HA的实现

(1) 基于文件系统的单点恢复

  • 主要用于开发或测试环境。

  • 当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息。

  • 基于文件系统的单点恢复,主要是在spark-env.sh里对SPARK_DAEMON_JAVA_OPTS设置

    • 创建存放文件夹:mkdir /opt/module/spark-2.1.0/recovery
    • 修改配置信息:
      • vi spark-env.sh
      • 增加内容:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/opt/module/spark-2.1.0/recovery"

(2) 基于Zookeeper的Standby Masters

  • 适用于现实生产。

  • ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker,Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示:

  • 修改配置信息:

    • vi spark-env.sh
    • 增加内容:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"
    • 注释掉:export SPARK_MASTER_HOSTexport SPARK_MASTER_PORT
  • 发送新的配置文件到集群其余节点:

    • scp spark-env.sh bigdata02:$PWD
    • scp spark-env.sh bigdata03:$PWD

5. 执行Spark的任务

(1) spark-submit

  • 用于提交Spark的任务(任务即相关jar包)
  • e.g.: 蒙特卡洛求PI(圆周率)
    • 原理:如下图所示,随机向正方形内落点,通过统计正方形内所有点数和落入圆内的点数来计算占比,得出正方形与圆的面积近似比值,进而近似出PI值。
    • 命令:
      • spark-submit --master spark://XXXX:7077 (指明master地址) --class org.apache.spark.examples.SparkPi (指明主程序的名字) /XXXX/spark/examples/jars/spark-examples_2.11-2.1.0.jar(指明jar包地址) 100(指明运行次数)

(2) spark-shell

  • 相当于REPL,作为一个独立的Application运行
  • spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。
  • 参数说明:
    • --master spark://XXXX:7077 指定Master的地址
    • --executor-memory 2g 指定每个worker可用内存为2G
    • --total-executor-cores 2 指定整个集群使用的cup核数为2个
  • Spark Session 是 2.0 以后提供的,利用 SparkSession 可以访问spark所有组件
  • 两种运行模式:
    • <1>. 本地模式
      • 启动:spark-shell(后面不接任何参数)
    • <2>. 集群模式
      • 启动:spark-shell --master spark://XXXX:7077(指明master地址)
  • e.g.: 编写WordCount程序
    • <1>. 处理本地文件,把结果打印到屏幕上
      • 启动:spark-shell
      • 传入文件:sc.textFile("/XXXX/WordCount.txt")(本地文件路径).flatMap(_.split(" "))(按照空格分割).map((_,1))(单词遍历).reduceByKey(_+_)(单词计数).collect
    • <2>. 处理HDFS文件,结果保存在hdfs上
      • 启动:spark-shell --master spark://XXXX:7077(指
      • sc.textFile("hdfs://XXXX:9000/sp_wc.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+).saveAsTextFile("hdfs://XXXX:9000/output/spark/WordCount")

(3) 单步运行WordCount -> RDD

  • 启动shell:spark-shell
  •   scala> val rdd1 = sc.textFile("/root/sp_wc.txt")
      rdd1: org.apache.spark.rdd.RDD[String] = /root/sp_wc.txt MapPartitionsRDD[1] at textFile at <console>:24
      
      scala> rdd1.collect
      res0: Array[String] = Array(I love Scala, I love Skark, 2019/5/8)
      
      scala> val rdd2 = rdd1.flatMap(_.split(" "))
      rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
      
      scala> rdd2.collect
      res1: Array[String] = Array(I, love, Scala, I, love, Skark, 2019/5/8)
      
      scala> val rdd3 = rdd2.map((_,1))
      rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
      
      scala> rdd3.collect
      res2: Array[(String, Int)] = Array((I,1), (love,1), (Scala,1), (I,1), (love,1), (Skark,1), (2019/5/8,1))
      
      scala> val rdd4 = rdd3.reduceByKey(_+_)
      rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
      
      scala> rdd4.collect
      res3: Array[(String, Int)] = Array((2019/5/8,1), (love,2), (I,2), (Skark,1), (Scala,1))
    

(4) 在IDE中运行WorkCount

  • <1>. scala版本
    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      
      object WordCount {
        
        def main(args: Array[String]): Unit = {
          
          //创建一个Spark配置文件
          val conf = new SparkConf().setAppName("Scala WordCount").setMaster("local")
          
          //创建Spark对象
          val sc = new SparkContext(conf)
          
          val result = sc.textFile(args(0))
            .flatMap(_.split(" "))
            .map((_, 1))
            .reduceByKey(_ + _)
            .saveAsTextFile(args(1))
      
          sc.stop()
        }
      }
      
  • <2>. Java版本
    • import java.util.Arrays;
      import java.util.Iterator;
      import java.util.List;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.FlatMapFunction;
      import org.apache.spark.api.java.function.Function2;
      import org.apache.spark.api.java.function.PairFunction;
      
      import parquet.format.PageHeader;
      import scala.Tuple2;
      
      public class WordCount {
      
      	public static void main(String[] args) {
      		// TODO Auto-generated method stub
      
      		SparkConf conf = new SparkConf()
      				.setAppName("JavaWordCount")
      				.setMaster("local") ;
      
      		//新建SparkContext对象
      		JavaSparkContext sc = new JavaSparkContext(conf) ;
      		
      		//读入数据
      		JavaRDD<String> lines = sc.textFile("hdfs://XXXX:9000/WordCount.txt") ;
      		
      		//分词 第一个参数表示读进来的话 第二个参数表示 返回值
      		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      		
      		@Override
      		public Iterator<String> call(String input) throws Exception {
      			
      			return Arrays.asList(input.split(" ")).iterator() ;
      		    }
      		}) ;
      		
      		//每个单词记一次数 
      		/*
      		* String, String, Integer
      		* input   <key      value>
      		*/
      		JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
      		
      		@Override
      		public Tuple2<String, Integer> call(String input) throws Exception {
      			
      			return new Tuple2<String, Integer>(input, 1) ;
      		}
      		}) ;
      		
      		//执行reduce操作
      		/*
      		* Integer, Integer, Integer
      		* nteger arg0, Integer arg1 返回值
      		*/
      		JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
      		
      			@Override
      			public Integer call(Integer arg0, Integer arg1) throws Exception {
      				// TODO Auto-generated method stub
      				return arg0 + arg1 ;
      			}
      		}) ;
      		
      		//打印结果
      		List<Tuple2<String, Integer>> output = counts.collect() ;
      		
      		for (Tuple2<String, Integer> tuple :output) {
      			System.out.println(tuple._1 + " : " + tuple._2) ;
      		}
      		
      		sc.stop() ;
      		
      		}
      }
      

(5) WordCount程序处理过程

(6) Spark提交任务的流程

6. Spark的算子

(1) RDD基础

  • <1>. 什么是RDD
    • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
  • <2>. RDD的属性(源码中的一段话)
    • **一组分片(Partition)。**即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
    • **一个计算每个分区的函数。**Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
    • **RDD之间的依赖关系。**RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
    • **一个Partitioner,即RDD的分片函数。**当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
    • **一个列表。**存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
  • <3>. RDD的创建方式
    • 通过外部的数据文件创建,如HDFS:
      • val rdd1 = sc.textFile(“hdfs://XXXX:9000/data.txt”)
    • 通过sc.parallelize进行创建:
      • val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    • DD的类型:Transformation和Action
  • <4>. RDD的基本原理

(2) Transformation

  • RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

(3) Action

(4) RDD的缓存机制

  • RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
  • 通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
  • 缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
    • Demo示例:
    • 通过UI进行监控:

(5) RDD的Checkpoint(检查点)机制:容错机制

  • 检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。
  • 设置checkpoint的目录,可以是本地的文件夹、也可以是HDFS。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据。
  • 分别举例说明:
    • <1>. 本地目录
    • 注意:这种模式,需要将spark-shell运行在本地模式上
    • <2>. HDFS的目录
    • 注意:这种模式,需要将spark-shell运行在集群模式上

(6) RDD的依赖关系和Spark任务中的Stage

  • RDD的依赖关系

    • RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

    • 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

      • 总结:窄依赖我们形象的比喻为独生子女
    • 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

      • 总结:窄依赖我们形象的比喻为超生
  • Spark任务中的Stage

    • DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

(7) RDD基础练习

  • 练习1:

  •   //通过并行化生成rdd
      val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
      //对rdd1里的每一个元素乘2然后排序
      val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
      //过滤出大于等于十的元素
      val rdd3 = rdd2.filter(_ >= 10)
      //将元素以数组的方式在客户端显示
      rdd3.collect
    
  • 练习2:

  •   val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
      //将rdd1里面的每一个元素先切分在压平
      val rdd2 = rdd1.flatMap(_.split(' '))
      rdd2.collect
    
  • 练习3:

  •   val rdd1 = sc.parallelize(List(5, 6, 4, 3))
      val rdd2 = sc.parallelize(List(1, 2, 3, 4))
      //求并集
      val rdd3 = rdd1.union(rdd2)
      //求交集
      val rdd4 = rdd1.intersection(rdd2)
      //去重
      rdd3.distinct.collect
      rdd4.collect
    
  • 练习4:

  •   val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
      //求jion
      val rdd3 = rdd1.join(rdd2)
      rdd3.collect
      //求并集
      val rdd4 = rdd1 union rdd2
      //按key进行分组
      rdd4.groupByKey
      rdd4.collect
    
  • 练习5:

  •   val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
      //cogroup
      val rdd3 = rdd1.cogroup(rdd2)
      //注意cogroup与groupByKey的区别
      rdd3.collect
    
  • 练习6:

  •   val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
      //reduce聚合
      val rdd2 = rdd1.reduce(_ + _)
      rdd2.collect
    
  • 练习7:

  •  val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
      val rdd3 = rdd1.union(rdd2)
      //按key进行聚合
      val rdd4 = rdd3.reduceByKey(_ + _)
      rdd4.collect
      //按value的降序排序
      val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
      rdd5.collect 
    

    7. Spark RDD的高级算子

(1) mapPartitionsWithIndex

  • 把每个partition中的分区号和对应的值拿出来
    • def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
  • f中函数参数:
    • 第一个参数是Int,代表分区号
    • 第二个Iterator[T]代表分区中的元素
  • e.g.: 将每个分区中的元素和分区号打印出来
    • val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    • 创建一个函数返回RDD中的每个分区号和元素:
      • def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
            iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
        }
        
    • 调用:rdd1.mapPartitionsWithIndex(func1).collect

(2) aggregate

  • 先对局部聚合,再对全局聚合
  • e.g.: val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
    • 查看每个分区中的元素:

      • scala> rdd1.mapPartitionsWithIndex(fun1).collect
          	res4: Array[String] = Array(
          	[partId : 0 , value = 1 ], [partId : 0 , value = 2 ], 
          	[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
        
    • 将每个分区中的最大值求和,注意初始值是0:

      • scala> rdd2.aggregate(0)(max(_,_),_+_)
          	res6: Int = 7
        
      • 如果初始值时候100,则结果为300:
      • scala> rdd2.aggregate(100)(max(_,_),_+_)
          	res8: Int = 300
            ```
        
    • 如果是求和,注意初始值是0:

      • scala> rdd2.aggregate(0)(_+_,_+_)
          	res9: Int = 15
        
      • 如果初始值是10,则结果是45
      • scala> rdd2.aggregate(10)(_+_,_+_)
          	res10: Int = 45  
        
    • e.g. —— 字符串:

      • val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
      • 修改一下刚才的查看分区元素的函数
        • def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
              iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
          }
          
        • 两个分区中的元素:
          • [partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
            [partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
            
        • 运行结果:
    • e.g.:

      • val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
        rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
        
      • 结果可能是24,也可能是42

      • val rdd4 = sc.parallelize(List("12","23","345",""),2)
        rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
        
      • 结果是10,也可能是01

      • 原因:注意有个初始值"",其长度0,然后0.toString变成字符串

      • val rdd5 = sc.parallelize(List("12","23","","345"),2)
        rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
        
      • 结果是11,原因同上。

(3) aggregateByKey

  • 准备数据:

    • val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
      def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
        iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
      }
      
  • 两个分区中的元素:

  • e.g.:

    • 将每个分区中的动物最多的个数求和
    • scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
      res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
      
    • 将每种动物个数求和
    • scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
      res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
      
    • 这个例子也可以使用:reduceByKey
    • scala> pairRDD.reduceByKey(_+_).collect
      res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
      

(4) coalesce与repartition

  • 都是将RDD中的分区进行重分区。
  • 区别:
    • coalesce默认不会进行shuffle(false);
    • repartition会进行shuffle(true),会将数据真正通过网络进行重分区。
  • e.g.:
    • def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
         iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
      }
       
      val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
       
      下面两句话是等价的:
      val rdd2 = rdd1.repartition(3)
      val rdd3 = rdd1.coalesce(3,true) -> 如果是false,查看RDD的length依然是2
      

(5) 其他高级算子

8. Spark 基础编程案例

(1) 求网站的访问量

  • Tomcat的访问日志如下:

  • 需求:找到访问量最高的两个网页,要求显示网页名称和访问量

  • 步骤分析:

    • <1>. 对网页的访问量求和
    • <2>. 降序排序
  • 代码:

    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      
      object TomcatLogCount {
        
        def main(args: Array[String]): Unit = {
          
          val conf = new SparkConf().setMaster("local").setAppName("TomcatLogCount")
          val sc = new SparkContext(conf)
          
          /*
           * 读入日志并解析
           * 
           * 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           * */
          
          val rdd1 = sc.textFile(" ").map(
              line => {
                //解析字符串,得到jsp的名字
                //1. 解析两个引号间的字符串
                val index1 = line.indexOf("\"")
                val index2 = line.lastIndexOf("\"")
                //line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
                val line1 = line.substring(index1 + 1, index2)
                
                val index3 = line1.indexOf(" ")
                val index4 = line1.lastIndexOf(" ")
                //line2 = /MyDemoWeb/oracle.jsp
                val line2 = line1.substring(index3 + 1, index4)
                
                //得到jsp的名字  oracle.jsp
                val jspName = line2.substring(line2.lastIndexOf("/"))
                
                (jspName, 1)
              }
              )
          //统计每个jsp的次数
          val rdd2 = rdd1.reduceByKey(_+_)
          
          //使用Value排序
          val rdd3 = rdd2.sortBy(_._2, false)
          
          //得到次数最多的两个jsp
          rdd3.take(2).foreach(println)
        
          sc.stop()
        }
      }
      

(2) 创建自定义分区

  • 根据jsp文件的名字,将各自的访问日志放入到不同的分区文件中,如下:
    • 生成的分区文件

    • 如:part-00000文件中的内容:只包含了web.jsp的访问日志

  • 代码:
    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import scala.collection.mutable.HashMap
      
      
      object TomcatLogPartitioner {
        
        def main(args: Array[String]): Unit = {
          
          val conf = new SparkConf().setMaster("local").setAppName("TomcatLogPartitioner")
          val sc = new SparkContext(conf)
          
          /*
           * 读入日志并解析
           * 
           * 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           * */
          
          val rdd1 = sc.textFile(" ").map(
              line => {
                //解析字符串,得到jsp的名字
                //1. 解析两个引号间的字符串
                val index1 = line.indexOf("\"")
                val index2 = line.lastIndexOf("\"")
                //line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
                val line1 = line.substring(index1 + 1, index2)
                
                val index3 = line1.indexOf(" ")
                val index4 = line1.lastIndexOf(" ")
                //line2 = /MyDemoWeb/oracle.jsp
                val line2 = line1.substring(index3 + 1, index4)
                
                //得到jsp的名字  oracle.jsp
                val jspName = line2.substring(line2.lastIndexOf("/"))
                
                (jspName, line)
              }
              )
              
              //得到不重复的jsp名字
              val rdd2 = rdd1.map(_._1).distinct().collect()
              
              //创建分区规则
              val wepPartitioner = new WepPartitioner(rdd2)
              val rdd3 = rdd1.partitionBy(wepPartitioner)
              
              //输出rdd3
              rdd3.saveAsTextFile(" ")
          
        }
        
        //定义分区规则
        class WepPartitioner(jspList : Array[String]) extends Partitioner {
          
          /*
           * 定义集合来保存分区条件:
           * String 代表jsp的名字
           * Int 代表序号
           * */ 
           
          val partitionMap = new HashMap[String, Int]()
          //初始分区号
          val partID = 0
          //填值
          for (jsp <- jspList) {
            patitionMap.put(jsp, partID)
            partID += 1
          }
          
          //返回分区个数
          def numPartitioners : Int = partitionMap.size
          
          //根据jsp,返回对应的分区
            def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(), 0)
            
        }
        
      }
      

(3) 使用JDBCRDD 访问数据库

  • JdbcRDD参数说明:

  • 从上面的参数说明可以看出,JdbcRDD有以下两个缺点:

    • <1>. 执行的SQL必须有两个参数,并类型都是Long
    • <2>. 得到的结果是ResultSet,即:只支持select操作
  • 代码:

    • import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import java.sql.Connection
      import java.sql.DriverManager
      import java.sql.PreparedStatement
      
      /*
       * 把Spark结果存放到mysql数据库中
       *
       */
      
      object TomcatLogCountToMysql {
        def main(args: Array[String]): Unit = {
          //创建SparkContext
          val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")
      
          val sc = new SparkContext(conf)
      
          /*
           *
           * 读入日志 解析:
           *
           * 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
           */
      
          val rdd1 = sc.textFile("H:\\tmp_files\\localhost_access_log.txt")
            .map(
              line => {
                //解析字符串,得到jsp的名字
                //1、解析两个引号之间的字符串
                val index1 = line.indexOf("\"")
                val index2 = line.lastIndexOf("\"")
                val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/oracle.jsp HTTP/1.1
      
                //得到两个空格的位置
                val index3 = line1.indexOf(" ")
                val index4 = line1.lastIndexOf(" ")
                val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/oracle.jsp
      
                //得到jsp的名字
                val jspName = line2.substring(line2.lastIndexOf("/")) // oracle.jsp
      
                (jspName, 1)
              })
      
          //
          //    try {
          //      /*
          //       * create table mydata(jsname varchar(50),countNumber Int)
          //       *
          //       * foreach  没有返回值,在本需求中,只需要写数据库,不需要返回新的RDD,所以用foreach即可
          //       *
          //       *
          //       * 运行 Task not serializable
          //       */
          //      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
          //      pst = conn.prepareStatement("insert into mydata values(?,?)")
          //
          //      rdd1.foreach(f => {
          //        pst.setString(1, f._1)
          //        pst.setInt(2, f._2)
          //
          //        pst.executeUpdate()
          //      })
          //    } catch {
          //      case t: Throwable => t.printStackTrace()
          //    } finally {
          //      if (pst != null) pst.close()
          //      if (conn != null) conn.close()
          //    }
          //
          //    sc.stop()
          //    //存入数据库
          //    var conn: Connection = null
          //    var pst: PreparedStatement = null
      
          //    //第一种修改方法
          //    /*
          //     * 修改思路:
          //     * conn pst 让每一个节点都是用到,需要在不同的节点上传输,实现sericalizable接口
          //     */
          //    try {
          //      rdd1.foreach(f => {
          //        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
          //        pst = conn.prepareStatement("insert into mydata values(?,?)")
          //
          //        pst.setString(1, f._1)
          //        pst.setInt(2, f._2)
          //
          //        pst.executeUpdate()
          //      })
          //    } catch {
          //      case t: Throwable => t.printStackTrace()
          //    } finally {
          //      if (pst != null) pst.close()
          //      if (conn != null) conn.close()
          //    }
          //
          //    sc.stop()
      
          /*
           * 第一种修改方式,功能上可以实现,但每条数据都会创建连接,对数据库造成很大压力
           *
           * 针对分区来操作:一个分区,建立一个连接即可
           */
          rdd1.foreachPartition(saveToMysql)
          sc.stop()
      
        }
      
        def saveToMysql(it: Iterator[(String, Int)]) = {
          var conn: Connection = null
          var pst: PreparedStatement = null
      
          try {
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
            pst = conn.prepareStatement("insert into mydata values(?,?)")
      
            it.foreach(f => {
      
              pst.setString(1, f._1)
              pst.setInt(2, f._2)
      
              pst.executeUpdate()
            })
          } catch {
            case t: Throwable => t.printStackTrace()
          } finally {
            if (pst != null) pst.close()
            if (conn != null) conn.close()
          }
        }
      
      }
      

      9. 认识 Spark SQL

(1) 什么是Spark SQL

  • Spark SQL is Apache Spark's module for working with structured data.(Spark SQL 是spark 的一个模块,用来处理 结构化的数据。<不能处理非结构化的数据>)
  • Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

(2) 为什么要学习Spark SQL

  • Hive是将HQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,但是MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快,同时Spark SQL也支持从Hive中读取数据,Hive 2.x 执行引擎可以使用Spark。

(3) Spark SQL的特点:

  • <1>. 容易集成
    • 不需要单独安装。
  • <2>. 统一的数据访问方式
    • 结构化数据(JDBC、JSon、Hive、parquer文件)都可以作为Spark SQL 的数据源。
      • 对接多种数据源,且使用方式类似。
  • <3>. 兼容Hive
    • 把Hive中的数据,读取到Spark SQL中运行。
  • <4>. 支持标准的数据连接(JDBC)

10. Spark SQL 基础

(1) 基本概念:Datasets和DataFrames

  • <1>. DataFrame

    • DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,

    • 例如:

      • 结构化数据文件
      • Hive中的表
      • 外部数据库或现有RDDs
    • DataFrame API支持的语言有Scala,Java,Python和R。

    • 从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

  • <2>. Datasets

    • Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。Dataset API 支持Scala和Java,Python不支持Dataset API。

(2) DataFrames

  • <1>. 创建 DataFrames

    • a. 通过Case Class创建DataFrames
      • ① 定义case class(相当于表的结构:Schema)
        • case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
        • 注意:由于mgr和comm列中包含null值,简单起见,将对应的case class类型定义为String
      • ② 将HDFS上的数据读入RDD,并将RDD与case Class关联
        • val lines = sc.textFile("/XXXX/emp.csv").map(_.split(","))
      • ③ 将RDD转换成DataFrames
        • val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
      • ④ 通过DataFrames查询数据
        • val df1 = allEmp.toDF
        • df1.show
    • b. 使用SparkSession
      • 什么是SparkSession
        • Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
        • 在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。   - 创建StructType,来定义Schema结构信息
        • 注意:需要import org.apache.spark.sql.types._import org.apache.spark.sql.Row
        •  import org.apache.spark.sql.types._
              
          		val myschema = StructType(
          		List(
          		StructField("empno",DataTypes.IntegerType),
          		StructField("ename",DataTypes.StringType),
          		StructField("job",DataTypes.StringType),
          		StructField("mgr",DataTypes.IntegerType),
          		StructField("hiredate",DataTypes.StringType),
          		StructField("sal",DataTypes.IntegerType),
          		StructField("comm",DataTypes.IntegerType),
          		StructField("deptno",DataTypes.IntegerType),
          		))
          		
          		val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
          	
          		import org.apache.spark.sql.Row
          		
          		val df2 = spark.createDataFrame(allEmp,myschema)
          
    • c. 使用JSon文件来创建DataFame
      • val df3 = spark.read    读文件,默认是Parquet文件
          	val df3 = spark.read.json("/XXXX/people.json")    读json文件
          	
          	df3.show
          	
          	val df4 = spark.read.format("json").load("/XXXX/people.json")
        
  • <2>. DataFrame 操作

    • DataFrame操作也称为无类型的Dataset操作

    • a. DSL语句

      • 查询所有的员工姓名
      • 查询所有的员工姓名和薪水,并给薪水加100块钱
      • 查询工资大于2000的员工
      • 求每个部门的员工人数
      • 参考:spark.apache.org/docs/2.1.0/…
    • b. SQL语句

      • **注意:**不能直接执行SQL,需要生成一个视图,再执行sql。
      • ① 将DataFrame注册成表(视图):df.createOrReplaceTempView("emp")
      • ② 执行查询:
        • spark.sql("select * from emp").show
        • spark.sql("select * from emp where deptno=10").show
        • spark.sql("select deptno,sum(sal) from emp group by deptno").show

(3) Spark SQL 中的视图

  • 视图是一个虚表,不存储数据。
  • 两种类型:
    • <1>. 普通视图(本地视图)——createOrReplaceTempView

      • 只在当前Session中有效。
    • <2>. 全局视图: ——createGlobalTempView

      • 在Spark SQL中,如果想拥有一个临时的view,并想在不同的Session中共享,而且在application的运行周期内可用,那么就需要创建一个全局的临时view。并记得使用的时候加上global_temp作为前缀来引用它,因为全局的临时view是绑定到系统保留的数据库global_temp上。
    • e.g.: ``` 创建一个新session,读取不到emp视图 spark.newSession.sql("select * from emp")

      以下两种方式均可读到 全局视图 中的数据:
      df1.createGlobalTempView("emp1")
      spark.newSession.sql("select * from global_temp.emp1").show
      
      spark.sql("select * from global_temp.emp1").show
      
      
      

(4) 创建Datasets

  • DataFrame的引入,可以让Spark更好的处理结构数据的计算,但其中一个主要的问题是:缺乏编译时类型安全。为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。
  • Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)
  • 创建DataSet:
    • <1>. 使用序列
      • ① 定义case class:
        • case class MyData(a:Int,b:String)
      • ② 生成序列并创建DataSet:
        • val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
      • ③ 查看结果
        • ds.show
    • <2>. 使用JSON数据
      • ① 定义case class:
        • case class Person(name: String, gender: String)
      • ② 通过JSON数据生成DataFrame:
        • val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
      • ③ 将DataFrame转成DataSet:
        • df.as[Person].show
        • df.as[Person].collect
    • <3>. 使用HDFS数据
      • ① 读取HDFS数据,并创建DataSet:
        • val linesDS = spark.read.text("hdfs://XXXX:9000/XXXX/data.txt").as[String]
      • ② 对DataSet进行操作:分词后,查询长度大于3的单词
        • val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
          words.show
          words.collect
          
      • ③ 执行WordCount程序
        • val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
          result.show
          排序:result.orderBy($"value").show
          

(5) Datasets 的操作案例

  • <1>. 使用emp.json 生成DataFrame:
    • val empDF = spark.read.json("/XXXX/emp.json")
      查询工资大于3000的员工
      empDF.where($"sal" >= 3000).show
      
  • <2>. 创建case class:
    • case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
  • <3>. 生成DataSets并查询数据:
    •  val empDS = empDF.as[Emp]
      
       查询工资大于3000的员工
       empDS.filter(_.sal > 3000).show
      
       查看10号部门的员工
       empDS.filter(_.deptno == 10).show
      
  • <4>. 多表查询:
    • a. 创建部门表:
      •   val deptRDD=sc.textFile("/XXXX/dept.csv").map(_.split(","))
          case class Dept(deptno:Int,dname:String,loc:String)
          val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
        
    • b. 创建员工表:
      • case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
        val empRDD = sc.textFile("/XXXX/emp.csv").map(_.split(","))
        val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
        
    • c. 执行多表查询:等值链接
      • val result = deptDS.join(empDS,"deptno")
        
        另一种写法:注意有三个等号
        val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
        joinWith和join的区别是连接后的新Dataset的schema会不一样
        
  • <5>. 查看执行计划:
    • result.explain

11. Spark SQL 进阶

(1) 使用数据源

  • <1>. 什么是parquet文件
    • Parquet是列式存储格式的一种文件类型,列式存储有以下的核心:
      • 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
      • 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。
      • 只读取需要的列,支持向量运算,能够获取更好的扫描性能。
      • Parquet格式是Spark SQL的默认数据源,可通过spark.sql.sources.default配置
  • <2>. 使用Load/Save函数
    • load函数是加载数据,save是存储数据。
    • e.g.:
      •   读取 users.parquet 文件(Spark自带的示例文件)
          val userDF = spark.read.load("/root/users.parquet")
          查看结构:
          userDF.printSchema
          查看内容:
          userDF.show
          
          读取json文件:
          val userDF = spark.read.load("/root/emp.json") ——>报错
          正确方法:
          val userDF = spark.read.format("json").load("/root/emp.json")
          val userDF = spark.read.json("/root/emp.json")
        
          保存parquet文件到本地路径:
          userDF.select($"name",$"favorite_color").write.save("/root/parquet")
          
          读取刚写入的文件:
          val userDF1 = spark.read.load("/root/parquet/part-00000-888d505a-7d51-4a50-aaf5-2bbdb56e67a1.snappy.parquet") --> 不推荐
          
          生产:(直接读目录)
          val userDF2 = spark.read.load("/usr/local/tmp_files/parquet")
        
    • 关于save函数:
      • 调用save函数的时候,可以指定存储模式,追加、覆盖等等

      • 可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:

      • userDF2.write.save("/root/parquet") ——>报错
          	
        save的时候覆盖:
        userDF2.write.mode("overwrite").save("/root/parquet")
        将结果保存成表:
        userDF2.select($"name").write.saveAsTable("table1")
        查看数据:
        spark.sql("select * from table2").show
        
        也可以进行分区、分桶等操作:partitionBy、bucketBy
        
  • <3>. Parquet文件
    • Parquet是一个列格式而且用于多个数据处理系统中。Spark SQL提供支持对于Parquet文件的读写,也就是自动保存原始数据的schema。当写Parquet文件时,所有的列被自动转化为nullable,因为兼容性的缘故。
    • e.g.:
      • 读入json格式的数据,将其转换成parquet格式,并创建相应的表来使用SQL进行查询。(把数据读进来,再写出去,就是Parquet文件)
        
        读入文件:
        val empDF = spark.read.json("/root/emp.json")
        
        写出文件:
        empDF.write.mode("overwrite").save("/root/parquet")
        empDF.write.mode("overwrite").parquet("/root/parquet")
          	
        建表查询:
        val emp1 = spark.read.parquet("/root/parquet")
        emp1.createOrReplaceTempView("emp1")
        spark.sql("select * from emp1").show
        
  • <4>. Schema的合并:
    • Parquet支持Schema evolution(Schema演变,即:合并)。用户可以先定义一个简单的Schema,然后逐渐的向Schema中增加列描述。通过这种方式,用户可以获取多个有不同Schema但相互兼容的Parquet文件。
    • e.g.:
      • 通过RDD来创建DataFrame:
        val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF("single","double") ——>"single","double"  是表结构
        df1.show
        df1.write.mode("overwrite").save("/root/test_table/key=1")
        val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF("single","triple")
        df2.show
        df2.write.mode("overwrite").save("/root/test_table/key=2")
          	
        合并两个部分:
        val df3 = spark.read.parquet("/root/tmp_files/test_table")
        val df3 = spark.read.option("mergeSchema",true).parquet("/root/tmp_files/test_table")
        
  • <5>. JSON Datasets
    • Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。
    • 需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:
    • 读取Json文件,生成DataFrame:
      val peopleDF = spark.read.json("/usr/local/tmp_files/people.json")
      打印Schema结构信息:
      peopleDF.printSchema
      创建临时视图:
      peopleDF.createOrReplaceTempView("peopleView")
      执行查询:
      spark.sql("select * from peopleView").show
      Spark SQL 支持统一的访问接口。对于不同的数据源,读取进来,生成DataFrame后,操作完全一样。
      
  • <6>. 使用JDBC
    • Spark SQL同样支持通过JDBC读取其他数据库的数据作为数据源。
    • Spark加载MySQL:
      • spark-shell --master spark://XXXX:7077 --jars /XXXX/.jar --driver-class-path /XXXX/.jar
    • Spark连接MySQL:
      • 方法一:
        •   val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://XXXX:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("driver","com.mysql.cj.jdbc.Driver").option("dbtable","emp").load
          
            mysqlDF.show```
          
      • 方式二:定义一个Properties类
        •   import java.util.Properties
            val mysqlProps = new Properties()
            mysqlProps.setProperty("user","root")
            mysqlProps.setProperty("password","123456")
            
            val mysqlDF1 = spark.read.jdbc("jdbc:mysql://XXXX:3306/company?serverTimezone=UTC&characterEncoding=utf-8","emp",mysqlProps)
            
            mysqlDF1.show	```
          
  • <7>. 使用Hive Table
    • a. 准备工作:
      • 搭建好Hive的环境(需要Hadoop)
      • 配置Spark SQL支持Hive:
        • 将以下文件拷贝到$SPARK_HOME/conf的目录下,即可
          • $HIVE_HOME/conf/hive-site.xml
          • $HADOOP_CONF_DIR/core-site.xml
          • $HADOOP_CONF_DIR/hdfs-site.xml
        • 重启Spark
    • b. 使用Spark Shell操作Hive
      • 启动Hadoop、Hive
      • 启动Spark
        • 启动spark-sql的时候,需要使用--jars指定mysql的驱动程序
      • 创建表:spark.sql("create table spark.emp1(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ','")
      • 导入数据:spark.sql("load data local inpath '/root/emp.csv' overwrite into table spark.emp1")
      • 查询数据:spark.sql("select * from spark.emp1").show

(2) 在IDE中开发Spark SQL

  • <1>. 创建DataFrame
    • a. StructType方式
      •   package Demo
        
          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.types.StructType
          import org.apache.spark.sql.types.StructField
          import org.apache.spark.sql.types.IntegerType
          import org.apache.spark.sql.types.StringType
          import org.apache.spark.sql.Row
          import org.apache.log4j.Logger
          import org.apache.log4j.Level
          
          /*创建DataFrame StructType方式*/
          object Demo01 {
          
            def main(args: Array[String]): Unit = {
          
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          		Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
              
              // 创建Spark Session对象
              val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
          
              // 从指定的地址创建RDD对象
              /*1    Tom    12
          		 *2    Mary	  13
          		 *3    Lily	  15
               * */
              val personRDD = spark.sparkContext.textFile("/Users/apple/Documents/student.txt").map(_.split("\t"))
          
              // 通过StructType方式指定Schema
              val schema = StructType(
                List(
                  StructField("id", IntegerType),
                  StructField("name", StringType),
                  StructField("age", IntegerType)))
          
              // 将RDD映射到rowRDD上,映射到Schema上
              val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))
              val personDataFrame = spark.createDataFrame(rowRDD, schema)
          
              // 注册视图
              personDataFrame.createOrReplaceTempView("t_person")
          
              //执行SQL语句
              val df = spark.sql("select * from t_person order by age desc")
          
              df.show()
        
          spark.stop()
        }
        
      }
    • b. case Class方式
      •   package Demo
        
          import org.apache.spark.sql.SparkSession
          import org.apache.spark.sql.types.StructType
          import org.apache.spark.sql.types.StructField
          import org.apache.spark.sql.types.IntegerType
          import org.apache.spark.sql.types.StringType
          import org.apache.spark.sql.Row
          import org.apache.log4j.Logger
          import org.apache.log4j.Level
          
          /*使用case Class来创建DataFrame*/
          object Demo02 {
          
            def main(args: Array[String]): Unit = {
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
              Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          
              //创建Spark Session对象
              val spark = SparkSession.builder().master("local").appName("Demo2").getOrCreate()
          
              //从指定的地址创建RDD对象
              val lineRDD = spark.sparkContext.textFile("").map(_.split("\t"))
          
              //把数据与case class做匹配
              val studentRDD = lineRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))
          
              //生成DataFrame
              import spark.sqlContext.implicits._
              val studentDF = studentRDD.toDF
          
              //注册视图 执行SQL
              studentDF.createOrReplaceTempView("student")
          
              spark.sql("select * from student").show
          
              spark.stop()
            }
          }
          
          //定义case class
          case class Student(stuId: Int, stuName: String, stuAge: Int)
        
  • <2>. 写入MySQL
    • package Demo
      
      
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types.StructType
      import org.apache.spark.sql.types.StructField
      import org.apache.spark.sql.types.IntegerType
      import org.apache.spark.sql.types.StringType
      import org.apache.spark.sql.Row
      import org.apache.log4j.Logger
      import org.apache.log4j.Level
      import java.util.Properties
      
      /*写入MySQL*/
      object Demo03 {
        
        def main(args: Array[String]): Unit = {
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
      		Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //创建Spark Session对象
          val spark = SparkSession.builder().master("local").appName("Demo3").getOrCreate()
      
          //从指定的地址创建RDD对象
          val lineRDD = spark.sparkContext.textFile("").map(_.split("\t"))
          
          //通过StructType方式指定Schema
          val schema = StructType(
            List(
              //字段与MySQL表中字段对应一致  
              StructField("personID", IntegerType),
              StructField("personName", StringType),
              StructField("personAge", IntegerType)))
              
         //将RDD映射到rowRDD上,映射到Schema上
         val rowRDD = lineRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
         val personDataFrame = spark.createDataFrame(rowRDD, schema)
         
         personDataFrame.createOrReplaceTempView("myperson")
         
         val result = spark.sql("select * from myperson")
         
         result.show()
         
         //把结果存入到mysql中
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "123456")
         //append追加模式
         result.write.mode("append").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
         
         spark.stop()
        
        }
      }
      
  • <3>. 使用Spark SQL 读取Hive中的数据,将计算结果存入mysql
    • package Demo
      
      import org.apache.spark.sql.SparkSession
      import java.util.Properties
      
      /*使用Spark SQL 读取Hive中的数据,将计算结果存入mysql*/
      //命令:./bin/spark-submit --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --class day0410.Demo4 /usr/local/tmp_files/Demo4.jar 
      object Demo4 {
        def main(args: Array[String]): Unit = {
      
          //创建SparkSession
          val spark = SparkSession.builder().appName("Demo4").enableHiveSupport().getOrCreate()
      
          //执行SQL
          val result = spark.sql("select deptno,count(1) from company.emp group by deptno")
      
          //将结果保存到mysql中
          val props = new Properties()
          props.setProperty("user", "root")
          props.setProperty("password", "123456")
      
          result.write.mode("append").jdbc("jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "emp_stat", props)
      
          spark.stop()
        }
      }
      

(3) Spark SQL 性能优化

  • <1>. 在内存中缓存数据

    • 直接读取内存的值来提高性能。
    • 通过spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")来从内存中去除table。
    • e.g.:
      •   操作mysql,启动spark shell 时,需要:
          ./bin/spark-shell --master spark://node3:7077 --jars /usr/local/tmp_files/mysql-connector-java-8.0.11.jar --driver-class-path /usr/local/tmp_files/mysql-connector-java-8.0.11.jar
          
          val mysqlDF = spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://192.168.109.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","emp").load
          
          mysqlDF.show
          mysqlDF.createOrReplaceTempView("emp")
          
          spark.sqlContext.cacheTable("emp")   ----> 标识这张表可以被缓存,数据还没有真正被缓存
          spark.sql("select * from emp").show  ----> 依然读取mysql
          spark.sql("select * from emp").show  ----> 从缓存中读取数据
          
          spark.sqlContext.clearCache
          
          清空缓存后,执行查询,会触发查询mysql数据库。
        
  • <2>. 性能优化相关参数

    • a. 将数据缓存到内存中的相关优化参数:

      • spark.sql.inMemoryColumnarStorage.compressed

        • 默认为 true
        • Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式。
      • spark.sql.inMemoryColumnarStorage.batchSize

        • 默认值:10000
        • 缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险。
    • b. 其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)

      • spark.sql.files.maxPartitionBytes
        • 默认值:128 MB
        • 读取文件时单个分区可容纳的最大字节数
      • spark.sql.files.openCostInBytes
        • 默认值:4M
        • 打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。
      • spark.sql.autoBroadcastJoinThreshold
        • 默认值:10M
        • 用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
      • spark.sql.shuffle.partitions
        • 默认值:200
        • 用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数。

12. 认识 Spark Streaming

(1) Spark Streaming 简介

  • 流式计算框架(类似于Storm)
  • 常用的实时计算引擎(流式计算)
    • <1>. Apache Storm:真正的流式计算
    • <2>. Spark Streaming :严格上来说不是真正的流式计算(实时计算),把连续的流式数据,当成不连续的RDD,本质是一个离散计算(不连续)
    • <3>. Apache Flink:真正的流式计算,与Spark Streaming相反, 把离散的数据,当成流式数据来处理
    • <4>. JStorm
  • Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.(易于构建灵活的、高容错的流式系统)
  • Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,还可以在数据流上应用Spark提供的机器学习和图处理算法。

(2) Spark Streaming 的特点

  • <1>. 易用,已经集成到Spark中
  • <2>. 容错性:底层RDD,RDD本身具有容错机制
  • <3>. 支持多种语言:Java Scala Python

(3) Spark Streaming的内部结构

  • 在内部,它的工作原理如下。Spark Streaming接收实时输入数据流,并将数据切分成批,然后由Spark引擎对其进行处理,最后生成“批”形式的结果流。
  • Spark Streaming将连续的数据流抽象为discretizedstream或DStream。在内部DStream 由一个RDD序列表示。

13. Spark Streaming 基础

(1) Spark Streaming 官方示例

  • <1>. 介绍:
    • 向Spark Streaming中发送字符串,Spark 接收到以后进行计数
  • <2>. 准备工作:
    • netcat网络工具(yum install nc.x86_64
    • **注意:**总核数大于等于2,一个核心用于接收数据,另一个用于处理数据
  • <3>. 操作:
    • 启动同一Linux系统的两个窗口,一个负责输入,一个负责监听
    • 窗口1:nc -l 1234-l监听模式;1234端口号)
    • 窗口2:run-example streaming.NetworkWordCount localhost 1234
    • 在窗口1输入文本信息,窗口2监听并进行计数统计

(2) 自写 Spark Streaming 官方示例

  • MyNetworkWordCount.scala
    • /**
        *
        * @ClassName: MyNetworkWordCount
        * @Description
        * @Author: YBCarry
        * @Date2019-05-13 20:49
        * @Version: V1.0
        *
        **/
      import org.apache.spark.streaming.StreamingContext
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.Seconds
      import org.apache.spark.storage.StorageLevel
      import org.apache.log4j.Logger
      import org.apache.log4j.Level
      import org.apache.spark.internal.Logging
      /*
       * 自写流式计算程序
       *
       * 知识点:
       * 1、创建一个StreamingContext对象  -->  核心:创建一个DStream
       * 2、DStream的表现形式:就是一个RDD
       * 3、使用DStream把连续的数据流变成不连续的RDD
       *
       * spark Streaming 最核心的内容
       */
      object MyNetworkWordCount {
        def main(args: Array[String]): Unit = {
      
      
          //创建一个Streaming Context对象
          //local[2] 表示开启了两个线程
          val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
          //Seconds(3) 表示采样时间间隔
          val ssc = new StreamingContext(conf, Seconds(3))
      
          //创建DStream 从netcat服务器上接收数据
          val lines = ssc.socketTextStream("172.16.194.128", 1234, StorageLevel.MEMORY_ONLY)
      
          //lines中包含了netcat服务器发送过来的数据
          //分词操作
          val words = lines.flatMap(_.split(" "))
      
          //计数
          val wordPair = words.transform(x => x.map(x => (x, 1)))
      
          //打印结果
          wordPair.print()
      
          //启动StreamingContext 进行计算
          ssc.start()
      
          //等待任务结束
          ssc.awaitTermination()
      
        }
      }
      

14. Spark Streaming 进阶

(1) StreamingContext对象详解

  • 初始化StreamingContext:
    • 方式一:从SparkConf对象中创建:
    • 方式二:从一个现有的SparkContext实例中创建
  • 程序中的几点说明:
    • appName参数是应用程序在集群UI上显示的名称。
    • master是Spark,Mesos或YARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。
    • 当在集群上运行程序时,不需要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将master的URL以脚本参数的形式传入。但是,对于本地测试和单元测试,您可以通过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)。
    • StreamingContext会内在的创建一个SparkContext的实例(所有Spark功能的起始点),你可以通过ssc.sparkContext访问到这个实例。
    • 批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。
  • 注意:
    • 一旦一个StreamingContextt开始运作,就不能设置或添加新的流计算。
    • 一旦一个上下文被停止,它将无法重新启动。
    • 同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。
    • StreamingContext上的stop()方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。
    • 只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext。

(2) 离散流(DStreams):Discretized Streams

  • 把连续的数据变成不连续的RDD
  • 因为DStream的特性,导致,Spark Streaming不是真正的流式计算
  • DiscretizedStream或DStream 是Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示,如下图:
  • 举例分析:在之前的NetworkWordCount的例子中,我们将一行行文本组成的流转换为单词流,具体做法为:将flatMap操作应用于名为lines的DStream中的每个RDD上,以生成words DStream的RDD。如下图所示:
  • 但是DStream和RDD也有区别,下面画图说明:

(3) 转换操作(transformation)

  • transform(func)

    • 通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
    • 举例:在NetworkWordCount中,也可以使用transform来生成元组对
  • updateStateByKey(func)

    • 操作允许不断用新信息更新它的同时保持任意状态。
    • 定义状态:状态可以是任何的数据类型
    • 定义状态更新函数:怎样利用更新前的状态和从输入流里面获取的新值更新状态
  • 重写NetworkWordCount程序,累计每个单词出现的频率(注意:累计)

    • TotalNetworkWordCount.scala
    •     package test.Network
      
          import org.apache.log4j.{Level, Logger}
          import org.apache.spark.SparkConf
          import org.apache.spark.storage.StorageLevel
          import org.apache.spark.streaming.{Seconds, StreamingContext}
          
          /**
            *
            * @ClassName: TotalNetworkWordCount
            * @Description: 实现累加操作
            * @Author: YBCarry
            * @Date2019-05-15 16:05
            * @Version: V1.0
            *
            **/
          object TotalNetworkWordCount {
          
            def main(args: Array[String]): Unit = {
          
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
              Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          
              //创建一个Streaming Context对象
              //local[2] 表示开启了两个线程
              val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
              //Seconds(3) 表示采样时间间隔
              val ssc = new StreamingContext(conf, Seconds(3))
          
              //设置检查点目录,保存之前都的状态信息
              ssc.checkpoint("")
          
              //创建DStream
              val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)
          
              //分割
              val words = lines.flatMap(_.split(" "))
          
              //计数
        //    val wordPair = words.map((_, 1))
              val wordPair = words.transform( x => x.map(x => (x, 1)))
          
              //定义一个值函数 ;累加计数
              /*
              * 接收两个参数
              * currentValues —— 当前值
              * previousValue ——历史值
              * */
              val addFunc = (currentValues : Seq[Int], previousValues : Option[Int]) => {
          
                //累加当前的序列
                val currrentTotal = currentValues.sum
                //累加历史值
                Some(currrentTotal + previousValues.getOrElse(0))
              }
          
              //累加运算
              val total = wordPair.updateStateByKey(addFunc)
          
              total.print()
          
              ssc.start()
      
          ssc.awaitTermination()
        }
      }
      
    
    

(4) 窗口操作

  • Spark Streaming还提供了窗口计算功能,允许在数据的滑动窗口上应用转换操作。下图说明了滑动窗口的工作方式:

  • 如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStream的RDD。在上面的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

    • 窗口长度(windowlength) - 窗口的时间长度(上图的示例中为:3)。
    • 滑动间隔(slidinginterval) - 两次相邻的窗口操作的间隔(即每次滑动的时间长度)(上图示例中为:2)。
    • 这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。
  • e.g.: 假设对之前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。则在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操作。这是通过使用reduceByKeyAndWindow操作完成的。

  •   package test.NetworkByWindow
    
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      /**
        *
        * @ClassName: NetworkWordCountByWindow
        * @Description: 每10秒读取过去30秒的数据
        * @Author: YBCarry
        * @Date2019-05-15 17:00
        * @Version: V1.0
        *
        **/
      object NetworkWordCountByWindow {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //创建一个Streaming Context对象
          //local[2] 表示开启了两个线程
          val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
          //Seconds(3) 表示采样时间间隔
          val ssc = new StreamingContext(conf, Seconds(3))
      
          //设置检查点目录,保存之前都的状态信息
          ssc.checkpoint("")
      
          //创建DStream
          val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)
      
          //分割 每个单词计数
          val words = lines.flatMap(_.split(" ")).map((_, 1))
      
          /*
          * 窗口操作
          * 参数说明:要进行的操作  窗口的大小(30s)  窗口移动距离(12s) ——> 采样时间(3)的整数倍 
          * */
          val result = words.reduceByKeyAndWindow((x : Int, y : Int) => (x + y), Seconds(30), Seconds(12))
        }
      
      }
    

15. Spark 数据源

(1) 输入DStreams和接收器

  • 输入DStreams表示从数据源获取输入数据流的DStreams。在NetworkWordCount例子中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。
  • 输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源:
    • 基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akka的actor等
    • 高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。
  • 下面通过具体的案例,详细说明:

(2) 基本源

  • <1>. 文件流:通过监控文件系统的变化,若有新文件添加,则将它读入并作为数据流

    • 注意:
      • ① 这些文件具有相同的格式
      • ② 这些文件通过原子移动或重命名文件的方式在dataDirectory创建
      • ③ 如果在文件中追加内容,这些追加的新数据也不会被读取。
    • Spark Streaming监控一个文件夹,如果有变化,则把变化采集过来
    • import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      /**
        *
        * @ClassName: FileStreaming
        * @Description
        * @Author: YBCarry
        * @Date2019-05-16 09:24
        * @Version: V1.0
        *
        **/
      object FileStreaming {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //创建一个Streaming Context对象
          //local[2] 表示开启了两个线程
          val conf = new SparkConf().setAppName("MyFileStreaming").setMaster("local[2]")
          //Seconds(3) 表示采样时间间隔
          val ssc = new StreamingContext(conf, Seconds(10))
      
          //监控目录,读取产生的新文件
          val lines = ssc.textFileStream("\\Users\\apple\\学习\\SparkFiles")
      
          lines.print()
      
          ssc.start()
          ssc.awaitTermination()
      
        }
      
      }
      
    • 注意:需要在原文件中编辑,然后拷贝一份。
  • <2>. RDD队列流

    • 使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream,用于调试Spark Streaming应用程序。
    • package test.RDDQueue
      
      
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.rdd.RDD
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      import scala.collection.mutable.Queue
      
      /**
        *
        * @ClassName: RDDQueueStream
        * @Description: RDD队列流
        * @Author: YBCarry
        * @Date2019-05-16 10:48
        * @Version: V1.0
        *
        **/
      object RDDQueueStream {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
          Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
      
          //创建一个Streaming Context对象
          //local[2] 表示开启了两个线程
          val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
          //Seconds(3) 表示采样时间间隔
          val ssc = new StreamingContext(conf, Seconds(3))
      
          //创建队列 RDD[Int]
          val rddQueue = new Queue[RDD[Int]]()
      
          //向队列里添加数据 (创建数据源)
          for (i <- 1 to 3) {
      
            rddQueue += ssc.sparkContext.makeRDD(1 to 10)
      
            //便于观察
            Thread.sleep(1000)
          }
      
          //从队列中接收数据,创建DStream
          val inputDStream = ssc.queueStream(rddQueue)
      
          //处理数据
          val result = inputDStream.map(x => (x, x * 2))
          result.print()
      
          ssc.start()
          ssc.awaitTermination()
      
        }
      
      }
      
  • <3>. 套接字流:通过监听Socket端口来接收数据

(3) 高级源

  • <1>. Spark Streaming接收Flume数据
    • a. 基于Flume的Push模式:
      • Flume被用于在Flume agents之间推送数据,在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用。Flume可以将数据推送到改receiver。
    • 以下为配置步骤:
      • **第一步:**Flume的配置文件
        • MyFlumeStream01.conf
        • #定义agent名, source、channel、sink的名称
          a4.sources = r1
          a4.channels = c1
          a4.sinks = k1
          
          #具体定义source
          a4.sources.r1.type = spooldir
          a4.sources.r1.spoolDir = /usr/local/tmp_files/logs
          
          #具体定义channel
          a4.channels.c1.type = memory
          a4.channels.c1.capacity = 10000
          a4.channels.c1.transactionCapacity = 100
          
          #具体定义sink
          a4.sinks = k1
          a4.sinks.k1.type = avro
          a4.sinks.k1.channel = c1
          a4.sinks.k1.hostname = bigdata01
          a4.sinks.k1.port = 1234
          
          #组装source、channel、sink
          a4.sources.r1.channels = c1
          a4.sinks.k1.channel = c1
          
      • **第二步:**Spark Streaming程序
        • package test.Flume
          
          import org.apache.log4j.{Level, Logger}
          import org.apache.spark.SparkConf
          import org.apache.spark.streaming.flume.FlumeUtils
          import org.apache.spark.streaming.{Seconds, StreamingContext}
          
          /**
            *
            * @ClassName: MyFlumeStream
            * @Description: flume将数据推送给Spark Streaming 使用push
            * @Author: YBCarry
            * @Date2019-05-16 14:01
            * @Version: V1.0
            *
            **/
          object MyFlumeStream01 {
          
            def main(args: Array[String]): Unit = {
          
              Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
              Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
          
              //创建一个Streaming Context对象
              //local[2] 表示开启了两个线程
              val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
              //Seconds(3) 表示采样时间间隔
              val ssc = new StreamingContext(conf, Seconds(3))
          
              //对接Flume
              //创建一个Flumeevent从flume中接收puch来的数据(也是DStream)
              //flume将数据push到localhost:1234,Spark Stream在这里监听
              val flumeEventDStream = FlumeUtils.createStream(ssc, "bigdata01", 1234)
          
              //将Flumeevent中的事件转换成字符串
              val lineDStream = flumeEventDStream.map(e => {
                new String(e.event.getBody.array)
              })
          
              //输出结果
              lineDStream.print()
          
              ssc.start()
              ssc.awaitTermination()
          
             }
          }
          
      • **第三步:**测试
        • 启动Flume
          • flume-ng agent -n a4 -f Spark/MyFlumeStream01.conf -c conf -Dflume.root.logger=INFO,console
        • 启动Spark Streaming程序
        • 拷贝日志文件到/root/training/logs目录
        • 观察输出,采集到数据
    • b. 基于Custom Sink的Pull模式
      • 不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。
      • 这样,与第一种模式相比,保证了很好的健壮性和容错能力,这种模式需要为Flume配置一个正常的sink。
      • 以下为配置步骤:
        • **第一步:**Flume的配置文件

          • FlumeLogPull.conf
          •   a1.channels = c1
              a1.sinks = k1
              a1.sources = r1
              
              a1.sources.r1.type = spooldir
              a1.sources.r1.spoolDir = /usr/local/tmp_files/logs
              
              a1.channels.c1.type = memory
              a1.channels.c1.capacity = 100000
              a1.channels.c1.transactionCapacity = 100000
              
              a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
              a1.sinks.k1.channel = c1
              a1.sinks.k1.hostname = bigdata01
              a1.sinks.k1.port = 1234
              
              #组装source、channel、sink
              a1.sources.r1.channels = c1
              a1.sinks.k1.channel = c1
            
            
        • **第二步:**Spark Streaming程序

          package test.Flume

          import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.flume.FlumeUtils

          /** *

          • @ClassName: FlumePutSink
          • @Description: 测试pull方式 使用Spark sink
          • @Author: YBCarry
          • @Date2019-05-16 15:23
          • @Version: V1.0

          **/ object FlumeLogPull {

          def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //创建一个Streaming Context对象 //local[2] 表示开启了两个线程 val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]") //Seconds(3) 表示采样时间间隔 val ssc = new StreamingContext(conf,Seconds(3))

          //创建FlumeEvent的DStream,采用pull的方式
          val flumeEvent = FlumeUtils.createPollingStream(ssc, "172.16.194.128",1234, StorageLevel.MEMORY_ONLY)
          
          //将FlumeEvent的事件准换成字符串
          val lineDStream = flumeEvent.map( e => {
            new String(e.event.getBody.array)
          })
          
          //输出结果
          lineDStream.print()
          
          ssc.start()
          ssc.awaitTermination()
          }
          

          }

        • **第三步:**需要的jar包

          • spark-streaming-flume-sink_2.11-2.1.0.jar拷贝到Flume的lib目录下。
        • **第四步:**测试

          • 启动Flume
          • 启动Spark Streaming程序
          • 将测试数据拷贝到/root/training/logs
          • 观察输出

16. Spark 性能优化

(1) 概述

  • Spark的计算本质是分布式计算,所以,Spark程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者内存。如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个瓶颈。Java虚拟机会定期进行垃圾回收,此时会追踪所有Java对象,并且在垃圾回收时,找到那些已经不再使用的对象。
  • 核心:清理旧对象,给新对象腾出空间。垃圾回收的性能开销,是与内存中的对象数量成正比。

(2) spark内存分配

(3) Spark GC原理

(4) 减少批数据的执行时间

  • 在Spark中有几个优化可以减少批处理的时间:
    • <1>. 减少批数据的执行时间
      • 在Spark中有几个优化可以减少批处理的时间:
        • ① 数据接收的并行水平
          • 通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。
        • ② 数据处理的并行水平
          • 如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism。
        • ③ 数据序列化
          • 可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:
            • 输入数据
            • 由流操作生成的持久RDD
          • 在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。

(5) 设置正确的批容量

  • 为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。
  • 根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。
  • 找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

(6) 内存调优

  • 介绍几个比较推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。
    • **Default persistence level of DStreams:**和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。
    • **Clearing persistent RDDs:**默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
    • **Concurrent garbage collector:**使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。

(7) shuffle原理

  • <1>. 优化前
  • <2>. 优化后

17. Spark MLlib库

(1) 概述

  • MLlib is Apache Spark's scalable machine learning library.(MLlib 是 Spark 支持 Scala 的可以扩展的机器学习库。)
  • Spark在机器学习方面具有得天独厚的有事,有以下几个原因:
    • <1>. 机器学习算法一般都有多个步骤迭代计算,需要在多次迭代后,获得足够小的误差或者收敛才会停止。
      • e.g.:
      •   double wucha = 1.0
          while ( wucha >= 0.00001 ) {
        	  建模  wucha -= 某个值
          }
          
          模型计算完毕
        
      • 当迭代使用Hadoop的MapReduce计算框架时,每次都要读写硬盘以及任务启动工作,导致很大的IO开销。而Spark基于内存的计算模型天生擅长迭代计算,只有在必要时,才会读写硬盘,所以Spark是机器学习比较理想的平台。
    • <2>. 通信角度
      • Hadoop的MapReduce计算框架通过heartbeat方式来进行通信和传递数据,执行速度慢。
        • Spark有高效的Akka和Netty通信系统,通信效率高。
  • SPark MLlib 是Spark 对常用的机器学习算法的实现库,同时包括相关测试和数据生成器。

(2) 什么是机器学习

  • <1>. 定义
    • A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E。
    • 机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。(通过算法使计算机能够模拟人类的判别能力)
    • **三个关键词:**算法、经验、模型评价
    • **应用:**金融反欺诈、语音识别、自然语言处理、翻译、模式识别、智能控制等等。
  • <2>. 机器学习工作流程
    • 在数据的基础上,通过算法构建出模型,并进行评价
      • 如果达到要求,则用该模型测试其他数据
      • 如果不达到要求,要调整算法来重新建立模型,再次进行评估
      • 循环往复,直到获得满意的经验
  • <3>. 基于大数据的机器学习
    • 传统的机器学习算法,由于技术和单机存储的限制,依赖于数据抽样,只能在少量数据上使用。所以存在的问题是很难做好随机,从而导致学习的模型不准确。
    • 在大数据上进行机器学习,可以直接处理全量数据并进行大量迭代计算。Spark本身计算优势,适合机器学习。此外spark-shell、pyspark都可以提供及时查询工具。

(3) MLlib

  • MLlib是Spark机器学习库,简化机器学习的工程实践工作,方便扩展到更大规模。集成了通用的学习算法:分类、回归、聚类、协同过滤、降维等等。另外,MLlib本身在Spark中,数据清洗、SQL、建模放在一起。