Distributed System | Spark RDD 论文总结

阅读 325
收藏 20
2017-03-02
原文链接:www.sczyh30.com

本篇文章是对Spark RDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。

RDD

Motivation

传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象 —— RDD

设计思想及特点

Resilient Distributed Dataset(RDD)是Apache Spark中数据的核心抽象,它是一种只读的、分区的数据记录集合。

RDD的特点:

  • Lazy evaluation,只在需要的时候才进行计算
  • RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算
  • Resilient: 借助RDD lineage graph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance

那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:

val rdd = sc.parallelize(1 to 100)
val result = rdd.map(_ + 10)
  .filter(_ > 15)
  .map(x => (x, 1))
  .reduceByKey(_+_)
  .collect

并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。

RDD的表示

Spark中的RDD主要包含五部分信息:

  • partitions(): partition集合
  • dependencies(): 当前RDD的dependency集合
  • iterator(split, context): 对每个partition进行计算或读取操作的函数
  • partitioner(): 分区方式,如HashPartitionerRangePartitioner
  • preferredLocations(split): 访问某个partition最快的节点

所有的RDD都继承抽象类RDD。几种常见的操作:

  • sc#textFile: 生成HadoopRDD,代表可以从HDFS中读取数据的RDD
  • sc#parallelize: 生成ParallelCollectionRDD,代表从Scala集合中生成的RDD
  • map, flatMap, filter: 生成MapPartitionsRDD,其partition与parent RDD一致,同时会对parent RDD中iterator函数返回的数据进行对应的操作(lazy)
  • union: 生成UnionRDDPartitionerAwareUnionRDD
  • reduceByKey, groupByKey: 生成ShuffledRDD,需要进行shuffle操作
  • cogroup, join: 生成CoGroupedRDD

Operations

Spark里面对RDD的操作分为两种:transformationaction

  • transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作
  • action会依次执行计算操作并且得到结果

这些transformation和action在FP中应该是很常见的,如map, flatMap, filter, reduce, count, sum

对单个数据操作的transformation函数都在RDD抽象类内,而对tuple操作的transformation都在PairRDDFunctions包装类中。RDD可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions类,从而可以进行reduceByKey之类的操作。对应的implicit函数:

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  new PairRDDFunctions(rdd)
}

Dependency

上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineage graph(依赖图)。Dependency大致分两种:narrow dependency和wide dependency。

  • Narrow dependency(NarrowDependency): Parent RDD中的每个partition最多被child RDD中的一个partition使用,即一对一的关系。比如map, flatMap, filter等transformation都是narrow dependency
  • Wide dependency(ShuffleDependency):Parent RDD中的每个partition会被child RDD中的多个partition使用,即一对多的关系。比如join生成的RDD一般是wide dependency(不同的partitioner)

论文中的图例很直观地表示了RDD间的依赖关系:

Spark RDD Dependency

这样划分dependency的原因:

  1. Narrow dependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而wide dependency必须要等所有的parent RDD的结果都准备好以后再执行计算
  2. Narrow dependency失败以后,Spark只需要重新计算失败的parent RDD即可;而对于wide dependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算

Shuffle

Spark中的shuffle操作与MapReduce中类似,在计算wide dependency对应的RDD的时候(即ShuffleMapStage)会触发。

首先来回顾一下为什么要进行shuffle操作。以reduceByKey操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组map task来将每个分区写入到临时文件中,然后下一个stage端(reduce task)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在map side也可能在reduce side进行)。

Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。

  • Shuffle write(map task): SortShuffleWriter#write
  • Shuffle read(reduce task): ShuffleRDD#compute

Persistence

Checkpointing

Checkpoint的目的是保存那些计算耗时较长的RDD数据(long lineage chains),执行Checkpoint的时候会新提交一个Job,因此最好先persistcheckpoint

Cache/Persist

cachepersist用于缓存一些经常使用的RDD结果(但是不能太大)。

  • persist方法的主要作用是改变StorageLevel以在compute的时候通过BlockManager进行相应的持久化操作
  • cache方法相当于设置存储级别为MEMORY_ONLY

Job Scheduling

简单来说,Spark会将提交的计算划分为不同的stages,形成一个有向无环图(DAG)。Spark的调度器会按照DAG的次序依次进行计算每个stage,最终得到计算结果。执行计算的几个重要的类或接口如下:

  • DAGScheduler
  • ActiveJob
  • Stage
  • Task
  • TaskScheduler
  • SchedulerBackend

这里面最为重要的就是 DAGScheduler 了,它会将逻辑执行计划(即RDD lineage)转化为物理执行计划(stage/task)。之前我们提到过,当开发者对某个RDD执行action的时候,Spark才会执行真正的计算过程。当开发者执行action的时候,SparkContext会将当前的逻辑执行计划传给DAGSchedulerDAGScheduler会根据给定的逻辑执行计划生成一个Job(对应ActiveJob类)并提交。每执行一个acton都会生成一个ActiveJob

提交Job的过程中,DAGScheduler会进行stage的划分。Spark里是按照shuffle操作来划分stage的,也就是说stage之间都是wide dependency,每个stage之内的dependency都是narrow dependency。这样划分的好处是尽可能地把多个narrow dependency的RDD放到同一个stage之内以便于进行pipeline计算,而wide dependency中child RDD必须等待所有的parent RDD计算完成并且shuffle以后才能接着计算,因此这样划分stage是最合适的。

划分好的stages会形成一个DAG,DAGScheduler会根据DAG中的顺序先提交parent stages(如果存在的话),再提交当前stage,以此类推,最先提交的是没有parent stage的stage。从执行角度来讲,一个stage的parent stages执行完以后,该stage才可以被执行。最后一个stage是产生最终结果的stage,对应ResultStage,而其余的stage都是ShuffleMapStage。下面是论文中stage划分的一个图例,非常直观:

DAG of stages

提交stage的时候,Spark会根据stage的类型生成一组对应类型的Task(ResultTaskShuffleMapTask),然后将这些Task包装成TaskSet提交到TaskScheduler中。一个Task对应某个RDD中的某一个partition,即一个Task只负责某个partition的计算:

val tasks: Seq[Task[_]] = try {
  stage match {
    case stage: ShuffleMapStage =>
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
      }
    case stage: ResultStage =>
      val job = stage.activeJob.get
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
      }
  }
} catch {
  // 此处代码略...
}

TaskScheduler会向执行任务的后端(SchedulerBackend,可以是Local, Mesos, Hadoop YARN或者其它集群管理组件)发送ReviveOffers消息,对应的执行后端接收到消息以后会将Task封装成TaskRunner(Runnable接口的实例),然后提交到底层的Executor中,并行执行计算任务。

Executor中的线程池定义如下:

private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
  val threadFactory = namedThreadFactory(prefix)
  Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

可以看到底层执行task的线程池实际上是JUC中的CachedThreadPool,按需创建新线程,同时会复用线程池中已经建好的线程。

最后用一幅图总结一下Job, Stage和Task的关系(图来自 Mastering Apache Spark 2.0):

Stage, Job and Task in Spark

整个Spark Context执行task的步骤图:

Memory Management

Spark中RDD的存储方式有两种:in memory和on disk,默认是in memory的。进行分布式计算的时候通常会读入大量的数据,并且通常还需要重用这些数据,如果简单地把内存管理交给GC的话,很容易导致回收失败从而cause full GC,影响性能。

Spark 1.5开始不再通过GC管理内存。Spark 1.5实现了一个内存管理器用于手动管理内存(Project Tungsten),底层通过Unsafe类来直接分配和回收内存。

另外,分布式计算系统的GC方面还可以参考OSDI 2016的一篇论文: Yak: A High-Performance Big-Data-Friendly Garbage Collector

PageRank实例

下面在Spark中跑一个PageRank来观察一下生成的Stage DAG。PageRank的公式比较简单:

Pa geRan k(p i )=1− d N +d∑ p j ∈M(p i ) Pa geRank(pj ) L (pj)

这里我们选择damping factor=0.85,初始的rank值为1.0;PageRank算法可以用马尔科夫矩阵进行优化,但是这里迭代次数较小,可以直接进行迭代计算。对应代码:

val iters = 10
val data = sc.textFile("data.txt")
val links = data.map { s =>
  val parts = s.split("\\s+")
  (parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
  val contribs = links.join(ranks).values.flatMap { case (urls, rank) =>
    val size = urls.size
    urls.map(url => (url, rank / size))
  }
  ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()

对应的Stage DAG:

DAG of stages in PageRank Algorithm

其中Stage 3中的RDD dependencies如下:

One stage in PageRank Algorithm


References

本文标题:Distributed System | Spark RDD 论文总结

文章作者:sczyh30

发布时间:2016年09月27日

原始链接:www.sczyh30.com/posts/Distr…

许可协议: "知识共享-保持署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

评论