关于Spark基础的一些小问题补充

2,289 阅读9分钟

前言

基于上两篇没提到的知识点的一些补充

三、Spark 内存计算框架

3.1 Spark的任务调度

  1. Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGSchedulerTaskScheduler
  2. 按照 RDD 的一系列操作顺序,来生成DAG有向无环图
  3. DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler
  4. TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行。
  5. 所有task运行完成,任务结束

3.2 spark 分区

在对RDD数据进行分区时,默认使用的是 HashPartitioner。该函数对key进行哈希,然后对分区总数取模,取模结果相同的就会被分到同一个partition中(这一招如果是文章一直跟过来的朋友们应该见我提过无数遍了)

如果嫌HashPartitioner功能单一,可以自定义partitioner,自定义partitioner大致分为3个步骤

  1. 继承 org.apache.spark.Partitioner
  2. 重写 numPartitions 方法
  3. 重写 getPartition 方法

3.3 小案例(使用Scala)

我们想根据 RDD 的key的长度进行分区,相同key的长度进入到同一个分区中(代码比较简单就不展开说明了)

3.3.1 main方法

//todo:使用自己实现的自定义分区
object TestPartitionerMain {

  def main(args: Array[String]): Unit = {
    //1、构建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("TestPartitionerMain").setMaster("local[2]")

    //2、构建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //3、构建数据源
    val data: RDD[String] = sc.parallelize(List("hadoop","hdfs","hive","spark","flume","kafka","flink","azkaban"))

    //4、获取每一个元素的长度,封装成一个元组
    val wordLengthRDD: RDD[(String, Int)] = data.map(x=>(x,x.length))

    //5、对应上面的rdd数据进行自定义分区
    val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3))

    //6、保存结果数据到文件
    result.saveAsTextFile("./data")

    sc.stop()

  }
}

3.3.2 自定义分区MyPartitioner

//自定义分区
class MyPartitioner(num:Int) extends Partitioner{
  //指定rdd的总的分区数
  override def numPartitions: Int = {
    num
  }

  //消息按照key的某种规则进入到指定的分区号中
  override def getPartition(key: Any): Int ={
    //这里的key就是单词
    val length: Int = key.toString.length

    length match {
      case 4 =>0
      case 5 =>1
      case 6 =>2
      case _ =>0
}

} }

3.4 spark的共享变量

3.4.1 spark的广播变量(broadcast variable)

Spark中分布式执行的代码需要传递到各个Executor的Task上运行。对于一些只读、固定的数据(比如从DB中读出的数据),每次都需要Driver广播到各个Task上,这样效率低下。

广播变量允许将变量广播给各个Executor。该Executor上的各个Task再从所在节点的BlockManager获取变量,而不是从Driver获取变量,以减少通信的成本,减少内存的占用,从而提升了效率。

3.4.2 广播变量的原理示意图

这个原理图很容易理解,其实就是这个word就是task们共同需要的数据,我们就将这个word下发到executor中,之后task获取这个word的数据的话,就直接找到内存中对应的引用即可。

3.4.3 广播变量使用

  1. 通过对一个类型T的对象调用 SparkContext.broadcast创建出一个Broadcast[T]对象。 任何可序列化的类型都可以这么实现
  2. 通过 value 属性访问该对象的值
  3. 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)

3,4.4 简单的代码示例

不使用广播变量代码示例

//这里的word单词为在每一个task中进行传输
val conf = new SparkConf().setMaster("local[2]").setAppName("brocast")
val rdd1=sc.textFile("/words.txt")
val word="spark"
val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(word))
rdd2.foreach(x=>println(x))

使用广播变量代码示例

val conf = new SparkConf().setMaster("local[2]").setAppName("brocast")
val sc=new SparkContext(conf)
val rdd1=sc.textFile("/words.txt")
val word="spark"
//通过调用sparkContext对象的broadcast方法把数据广播出去
val broadCast = sc.broadcast(word)

//在executor中通过调用广播变量的value属性获取广播变量的值
val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(broadCast.value))
rdd2.foreach(x=>println(x))

3.4.5 广播变量使用注意事项

  1. 不能将一个RDD使用广播变量广播出去

  2. 广播变量只能在Driver端定义,不能在Executor端定义

  3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值

  4. 如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本

  5. 如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本

3.4.6 Spark 的 accumulator

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于MapReduce,即分布式的改变,然后聚合这些改变。它一个常见用途是在调试时对作业执行过程中的事件进行计数。可以使用累加器来进行全局的计数。

使用方式

  1. 通过在driver中调用 ==SparkContext.accumulator(initialValue) ==方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值initialValue 的类型。
  2. spark闭包(函数序列化)里的excutor代码可以使用累加器的 add 方法增加累加器的值。
  3. driver程序可以调用累加器的 value 属性来访问累加器的值。

3.5 Spark 程序的序列化问题

3.5.1 transformation操作为什么需要序列化

spark是分布式执行引擎,其核心抽象是弹性分布式数据集RDD,其代表了分布在不同节点的数据。Spark的计算是在executor上分布式执行的,故用户开发的关于RDD的map,flatMap,reduceByKey等transformation 操作(闭包)有如下执行过程:

  1. 代码中对象在driver本地序列化
  2. 对象序列化后传输到远程executor节点
  3. 远程executor节点反序列化对象
  4. 最终远程节点执行

故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。

3.5.2 spark的任务序列化异常

在编写spark程序中,由于在map,foreachPartition等算子内部使用了外部定义的变量和函数,从而引发Task未序列化问题。然而spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换。

比如出现“org.apache.spark.SparkException: Task not serializable”这个错误:其原因就在于这些算子使用了外部的变量,但是这个变量不能序列化。当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现Task未序列化问题。

3.5.3 spark中解决序列化的办法

  1. 如果函数中使用了该类对象,该类要实现序列化
  2. 如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化
  3. 对于不能序列化的成员变量使用“@transient”标注,告诉编译器不需要序列化
  4. 也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率
  5. 可以把对象的创建直接在该函数中构建这样避免需要序列化

3.6 Spark on Yarn

可以把spark程序提交到yarn中去运行,此时spark任务所需要的计算资源由yarn中的老大ResourceManager去分配

官网资料地址:spark.apache.org/docs/2.3.3/… spark-env.sh

按照Spark应用程序中的driver分布方式不同,Spark on YARN有两种模式: yarn-client模式、yarn-cluster模式

3.6.1 yarn-cluster模式

yarn-cluster模式下提交任务示例

spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/opt/spark/examples/jars/spark-examples_2.11-2.3.3.jar \
10

此时注意如果运行出现错误,可能是虚拟内存不足,可以添加参数

<!--容器是否会执行物理内存限制默认为True-->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>

<!--容器是否会执行虚拟内存限制    默认为True-->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

3.6.2 yarn-client模式

spark-submit --class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode client
--driver-memory 1g
--executor-memory 1g
--executor-cores 1
/opt/spark/examples/jars/spark-examples_2.11-2.3.3.jar
10

3.6.3 原理图

yarn-cluster:

yarn-client:

它们的区别:

yarn-cluster模式:spark程序的Driver程序在YARN中运行,运行结果不能在客户端显示,并且客户端可以在启动应用程序后消失应用的。最好运行那些将结果最终保存在外部存储介质(如HDFS、Redis、Mysql),客户端的终端显示的仅是作为YARN的job的简单运行状况。

yarn-client模式: spark程序的==Driver运行在Client上==,应用程序运行结果会在客户端显示,所有适合运行结果有输出的应用程序(如spark-shell)

最大的区别就是Driver端的位置不一样。

yarn-cluster: Driver端运行在yarn集群中,与ApplicationMaster进程在一起。

yarn-client: Driver端运行在提交任务的客户端,与ApplicationMaster进程没关系,经常用于测试

3.7 collect 算子操作的问题

collect算子操作的作用是作为一个action操作,会触发任务的运行,而且它会把RDD的数据进行收集之后,以数组的形式返回给Driver端

注意:

默认Driver端的内存大小为1G,由参数 spark.driver.memory 设置

如果某个rdd的数据量超过了Driver端默认的1G内存,对rdd调用collect操作,这里会出现Driver端的内存溢出,所有这个collect操作存在一定的风险,实际开发代码一般不会使用。

实际企业中一般都会把该参数调大,比如5G/10G等,使用 new SparkConf().set("spark.driver.memory","5G") 可以修改该参数

3.8 Spark任务中资源参数剖析

executor-memory:表示每一个executor进程需要的内存大小,它决定了后期操作数据的速度

total-executor-cores:表示任务运行需要总的cpu核数,它决定了任务并行运行的粒度

后期对于spark程序的优化,可以从这2个参数入手,无论你把哪一个参数调大,对程序运行的效率来说都会达到一定程度的提升 加大计算资源它是最直接、最有效果的优化手段。 在计算资源有限的情况下,可以考虑其他方面,比如说代码层面,JVM层面等

finally

···