阅读 136

spark——rdd常用的转化和行动操作

本文始发于个人公众号:TechFlow,原创不易,求个关注


今天是spark第三篇文章,我们继续来看RDD的一些操作。

我们前文说道在spark当中RDD的操作可以分为两种,一种是转化操作(transformation),另一种是行动操作(action)。在转化操作当中,spark不会为我们计算结果,而是会生成一个新的RDD节点,记录下这个操作。只有在行动操作执行的时候,spark才会从头开始计算整个计算。

而转化操作又可以进一步分为针对元素的转化操作以及针对集合的转化操作。

针对元素的转化操作

针对元素的转化操作非常常用,其中最常用的就是map和flatmap。从名字上看这两者都是map操作,map操作我们都知道,在之前的MapReduce文章以及Python map、reduce用法的文章当中都有提及。简而言之就是可以将一个操作映射在每一个元素上。

比如假设我们有一个序列[1, 3, 4, 7],我们希望将当中每一个元素执行平方操作。我们当然可以用for循环执行,但是在spark当中更好的办法是使用map。

nums = sc.parallelize([1347])
spuare = nums.map(lambda x: x * x)
复制代码

我们知道map是一个转化操作,所以square仍然是一个RDD,我们直接将它输出不会得到结果,只会得到RDD的相关信息:

内部RDD的转化图是这样的:

我们想看结果就必须要执行行动操作,比如take,我们take一下查看一下结果:

和我们的预期一致,对于之前一直关注的同学来说map操作应该已经很熟悉了,那么这个flatmap又是什么呢?

差别就在这个flat,我们都知道flat是扁平的意思,所以flatmap就是说map执行之后的结果扁平化。说白了也就是说如果map执行之后的结果是一个数组的话,那么会将数组拆开,把里面的内容拿出来组合到一起。

我们一起来看一个例子:

texts = sc.parallelize(['now test''spark rdd'])
split = texts.map(lambda x: x.split(' '))
复制代码

由于我们执行map的对象是一个字符串,一个字符串执行split操作之后会得到一个字符串数组。如果我们执行map,得到的结果会是:

如果我们执行flatmap呢?我们也可以试一下:

对比一下,有没有注意到差别?

是了,map执行的结果是一个array的array,因为每一个string split之后就是一个array,我们把array拼接到一起自然是一个array的array。而flatMap会把这些array摊平之后放在一起,这也是两者最大的差别。

针对集合的转化操作

上面介绍了针对元素的转化操作,下面来看看针对集合的转化操作。

针对集合的操作大概有union,distinct,intersection和subtract这几种。我们可以先看下下图有一个直观地感受,之后我们再一一分析:

首先来看distinct,这个顾名思义,就是去除重复。和SQL当中的distinct是一样的,这个操作的输入是两个集合RDD,执行之后会生成一个新的RDD,这个RDD当中的所有元素都是unique的。有一点需要注意,执行distinct的开销很大,因为它会执行shuffle操作将所有的数据进行乱序,以确保每个元素只有一份。如果你不明白shuffle操作是什么意思,没有关系,我们在后序的文章当中会着重讲解。只需要记住它的开销很大就行了。

第二种操作是union,这个也很好理解,就是把两个RDD当中的所有元素合并。你可以把它当成是Python list当中的extend操作,同样和extend一样,它并不会做重复元素的检测,所以如果合并的两个集合当中有相同的元素并不会被过滤,而是会被保留。

第三个操作是intersection,它的意思是交集,也就是两个集合重叠的部分。这个应该蛮好理解的,我们看下下图:

下图当中蓝色的部分,也就是A和B两个集合的交集部分就是A.intersection(B)的结果,也就是两个集合当中共有的元素。同样,这个操作也会执行shuffle,所以开销一样很大,并且这个操作会去掉重复的元素。

最后一个是subtract,也就是差集,就是属于A不属于B的元素,同样我们可以用图来表示:

上图当中灰色阴影部分就是A和B两个集合的差集,同样,这个操作也会执行shuffle,非常耗时。

除了以上几种之外,还有cartesian,即笛卡尔积,sample抽样等集合操作,不过相对而言用的稍微少一些,这里就不过多介绍了,感兴趣的同学可以了解一下,也并不复杂。

行动操作

RDD中最常用的行动操作应该就是获取结果的操作了,毕竟我们算了半天就是为了拿结果,只获取RDD显然不是我们的目的。获取结果的RDD主要是take,top和collect,这三种没什么特别的用法,简单介绍一下。

其中collect是获取所有结果,会返回所有的元素。take和top都需要传入一个参数指定条数,take是从RDD中返回指定条数的结果,top是从RDD中返回最前面的若干条结果,top和take的用法完全一样,唯一的区别就是拿到的结果是否是最前面的。

除了这几个之外,还有一个很常用的action是count,这个应该也不用多说,计算数据条数的操作,count一下就可以知道有多少条数据了。

reduce

除了这些比较简单的之外,再介绍另外两个比较有意思的,首先,先来介绍reduce。reduce顾名思义就是MapReduce当中的reduce,它的用法和Python当中的reduce几乎完全一样,它接受一个函数来进行合并操作。我们来看个例子:

在这个例子当中,我们的reduce函数是将两个int执行加和,reduce机制会重复执行这个操作将所有的数据合并,所以最终得到的结果就是1 + 3 + 4 + 7 = 15.

fold

除了reduce之外还有一个叫做fold的action,它和reduce完全一样,唯一不同的是它可以自定义一个初始值,并且是针对分区的,我们还拿上面的例子举例:

直接看这个例子可能有点懵逼,简单解释一下就明白了,其实不复杂。我们注意到我们在使用parallelize创造数据的时候多加了一个参数2,这个2表示分区数。简单可以理解成数组[1, 3, 4, 7]会被分成两部分,但是我们直接collect的话还是原值。

现在我们使用fold,传入了两个参数,除了一个函数之外还传入了一个初始值2。所以整个计算过程是这样的:

对于第一个分区的答案是1 + 3 + 2 = 6,对于第二个分区的答案是4 + 7 + 2 = 13,最后将两个分区合并:6 + 13 + 2 = 21。

也就是说我们对于每个分区的结果赋予了一个起始值,并且对分区合并之后的结果又赋予了一个起始值。

aggregate

老实讲这个action是最难理解的,因为它比较反常。首先,对于reduce和fold来说都有一个要求就是返回值的类型必须和rdd的数据类型相同。比如数据的类型是int,那么返回的结果也要是int。

但是对于有些场景这个是不适用的,比如我们想求平均,我们需要知道term的和,也需要知道term出现的次数,所以我们需要返回两个值。这个时候我们初始化的值应该是0, 0,也就是对于加和与计数而言都是从0开始的,接着我们需要传入两个函数,比如写成这样:

nums.aggregate((00), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))
复制代码

看到这行代码会懵逼是必然的,不用担心,我们一点一点解释。

首先是第一个lambda函数,这里的x不是一个值而是两个值,或者说是一个二元组,也就是我们最后返回的结果,在我们的返回预期里,第一个返回的数是nums的和,第二个返回的数是nums当中数的个数。而这里的y则是nums输入的结果,显然nums输入的结果只有一个int,所以这里的y是一维的。那么我们要求和当然是用x[0] + y,也就是说把y的值加在第一维上,第二维自然是加一,因为我们每读取一个数就应该加一。

这点还比较容易理解,第二个函数可能有些费劲,第二个函数和第一个不同,它不是用在处理nums的数据的,而是用来处理分区的。当我们执行aggregate的时候,spark并不是单线程执行的,它会将nums中的数据拆分成许多分区,每个分区得到结果之后需要合并,合并的时候会调用这个函数。

和第一个函数类似,第一个x是最终结果,而y则是其他分区运算结束需要合并进来的值。所以这里的y是二维的,第一维是某个分区的和,第二维是某个分区当中元素的数量,那么我们当然要把它都加在x上。

上图展示了两个分区的时候的计算过程,其中lambda1就是我们传入的第一个匿名函数,同理,lambda2就是我们传入的第二个匿名函数。我想结合图应该很容易看明白。

行动操作除了这几个之外还有一些,由于篇幅原因我们先不赘述了,在后序的文章当中如果有出现,我们会再进行详细解释的。初学者学习spark比较抗拒的一个主要原因就是觉得太过复杂,就连操作还区分什么转化操作和行动操作。其实这一切都是为了惰性求值从而优化性能。这样我们就可以把若干个操作合并在一起执行,从而减少消耗的计算资源,对于分布式计算框架而言,性能是非常重要的指标,理解了这一点,spark为什么会做出这样的设计也就很容易理解了。

不仅spark如此,TensorFlow等深度学习框架也是如此,本质上许多看似反直觉的设计都是有更深层的原因的,理解了之后其实也很容易猜到,凡是拿到最终结果的操作往往都是行动操作,如果只是一些计算,那么十有八九是转化操作。

持久化操作

Spark当中的RDD是惰性求值的,有的时候我们会希望多次使用同一个RDD。如果我们只是简单地调用行动操作,那么spark会多次重复计算RDD和它对应的所有数据以及其他依赖,这显然会带来大量开销。我们很自然地会希望对于我们经常使用的RDD可以缓存起来,在我们需要的时候随时拿来用,而不是每次用到的时候都需要重新跑。

为了解决这个问题,spark当中提供了持久化的操作。所谓的持久化可以简单理解成缓存起来。用法也很简单,我们只需要对RDD进行persist即可:

texts = sc.parallelize(['now test''hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()
复制代码

调用完持久化之后,RDD会被缓存进内存或磁盘当中,我们需要的时候可以随时调出来使用,就不用把前面的整个流程全部跑一遍了。并且spark当中支持多种级别的持久化操作,我们可以通过StorageLevel的变量来控制。我们来看下这个StorageLevel的取值:

我们根据需要选择对应的缓存级别即可。当然既然有持久化自然就有反持久化,对于一些已经不再需要缓存的RDD,我们可以调用unpersist将它们从缓存当中去除。

今天的内容虽然看起来各种操作五花八门,但是有些并不是经常用到,我们只需要大概有个印象,具体操作的细节可以等用到的时候再做仔细的研究。希望大家都能忽略这些并不重要的细节,抓住核心的本质。

今天的文章就是这些,如果觉得有所收获,请顺手点个关注或者转发吧,你们的举手之劳对我来说很重要。

关注下面的标签,发现更多相似文章
评论