spark中的pair rdd,看这一篇就够了

1,482 阅读11分钟

本文始发于个人公众号:TechFlow,原创不易,求个关注


今天是spark专题的第四篇文章,我们一起来看下Pair RDD。

定义

在之前的文章当中,我们已经熟悉了RDD的相关概念,也了解了RDD基本的转化操作和行动操作。今天我们来看一下RDD当中非常常见的PairRDD,也叫做键值对RDD,可以理解成KVRDD。

KV很好理解,就是key和value的组合,比如Python当中的dict或者是C++以及Java当中的map中的基本元素都是键值对。相比于之前基本的RDD,pariRDD可以支持更多的操作,相对来说更加灵活,可以完成更加复杂的功能。比如我们可以根据key进行聚合,或者是计算交集等。

所以本身pairRDD只不过是数据类型是KV结构的RDD而已,并没有太多的内涵,大家不需要担心。

Pair RDD转化操作

Pair RDD也是RDD,所以之前介绍的RDD的转化操作Pair RDD自然也可以使用。它们两者有些像是类继承的关系,RDD是父类,Pair RDD是实现了一些新特性的子类。子类可以调用父类当中所有的方法,但是父类却不能调用子类中的方法。

调用的时候需要注意,由于我们的Pair RDD中的数据格式是KV的二元组,所以我们传入的函数必须是针对二元组数据的,不然的话可能运算的结果会有问题。下面我们来列举一些最常用的转化操作。

为了方便演示,我们用一个固定的RDD来运行各种转化操作,来直观了解一下这些转化操作究竟起什么样的作用。

ex1 = sc.parallelize([[12], [34], [35]])

keys,values和sortByKey

这三个转化操作应该是最常用也是最简单的,简单到我们通过字面意思就可以猜出它们的意思。

我们先来看keys和values:

我们的RDD当中二元组当中的第一个元素会被当做key,第二个元素当做value,需要注意的是,它并不是一个map或者是dict,所以key和value都是可以重复的

sortByKey也很直观,我们从字面意思就看得出来是对RDD当中的数据根据key值进行排序,同样,我们也来看下结果:

mapValues和flatMapValues

mapValues不能直接使用,而必须要传入一个函数作为参数。它的意思是对所有的value执行这个函数,比如我们想把所有的value全部转变成字符串,我们可以这么操作:

flatMapValues的操作和我们的认知有些相反,我们都知道flatMap操作是可以将一个嵌套的数组打散,但是我们怎么对一个value打散嵌套呢?毕竟我们的value不一定就是一个数组,这就要说到我们传入的函数了,这个flatMap的操作其实是针对函数返回的结果的,也就是说函数会返回一个迭代器,然后打散的内容其实是这个迭代器当中的值。

我这么表述可能有些枯燥,我们来看一个例子就明白了:

不知道这个结果有没有出乎大家的意料,它的整个流程是这样的,我们调用flatMapValues运算之后返回一个迭代器,迭代器的内容是range(x, x+3)。其实是每一个key对应一个这样的迭代器,之后再将迭代器当中的内容打散,和key构成新的pair。

groupByKey,reduceByKey和foldByKey

这两个功能也比较接近,我们先说第一个,如果学过SQL的同学对于group by操作的含义应该非常熟悉。如果没有了解过也没有关系,group by可以简单理解成归并或者是分桶。也就是说将key值相同的value归并到一起,得到的结果是key-list的Pair RDD,也就是我们把key值相同的value放在了一个list当中。

我们也来看下例子:

我们调用完groupby之后得到的结果是一个对象,所以需要调用一下mapValues将它转成list才可以使用,否则的话是不能使用collect获取的。

reduceByKey和groupByKey类似,只不过groupByKey只是归并到一起,然而reduceByKey是传入reduce函数,执行reduce之后的结果。我们来看一个例子:

在这个例子当中我们执行了累加,把key值相同的value加在了一起。

foldByKey和fold的用法差别并不大,唯一不同的是我们加上了根据key值聚合的逻辑。如果我们把分区的初始值设置成0的话,那么它用起来和reduceByKey几乎没有区别:

我们只需要清楚foldByKey当中的初始值针对的是分区即可。

combineByKey

这个也是一个很核心并且不太容易理解的转化操作,我们先来看它的参数,它一共接受5个参数。我们一个一个来说,首先是第一个参数,是createCombiner

它的作用是初始化,将value根据我们的需要做初始化,比如将string类型的转化成int,或者是其他的操作。我们用记号可以写成是V => C,这里的V就是value,C是我们初始化之后的新值。

它会和value一起被当成新的pair传入第二个函数,所以第二个函数的接受参数是(C, V)的二元组。我们要做的是定义这个二元组的合并,所以第二个函数可以写成(C, V) => C。源码里的注释和网上的教程都是这么写的,但我觉得由于出现了两个C,可能会让人难以理解,我觉得可以写成(C, V) => D,比较好。

最后一个函数是将D进行合并,所以它可以写成是(D, D) => D。

到这里我们看似好像明白了它的原理,但是又好像有很多问号,总觉得哪里有些不太对劲。我想了很久,才找到了问题的根源,出在哪里呢,在于合并。有没有发现第二个函数和第三个函数都是用来合并的,为什么我们要合并两次,它们之间的区别是什么?如果这个问题没搞明白,那么对于它的使用一定是错误的,我个人觉得这个问题才是这个转化操作的核心,没讲清楚这个问题的博客都是不够清楚的。

其实这两次合并的逻辑大同小异,但是合并的范围不一样,第一次合并是针对分区的,第二次合并是针对key的。因为在spark当中数据可能不止存放在一个分区内,所以我们要合并两次,第一次先将分区内部的数据整合在一起,第二次再跨分区合并。由于不同分区的数据可能相隔很远,所以会导致网络传输的时间过长,所以我们希望传输的数据尽量小,这才有了groupby两次的原因。

我们再来看一个例子:

在这个例子当中我们计算了每个单词出现的平均个数,我们一点一点来看。首先,我们第一个函数将value转化成了(1, value)的元组,元组的第0号元素表示出现该单词的文档数,第1号元素表示文档内出现的次数。所以第二个函数,也就是在分组内聚合的函数,我们对于出现的文档数只需要加一即可,对于出现的次数要进行累加。因为这一次聚合的对象都是(1, value)类型的元素,也就是没有聚合之前的结果。

在第三个函数当中,我们对于出现的总数也进行累加,是因为这一个函数处理的结果是各个分区已经聚合一次的结果了。比如apple在一个分区内出现在了两个文档内,一共出现了20次,在一个分区出现在了三个文档中,一共出现了30次,那么显然我们一共出现在了5个文档中,一共出现了50次。

由于我们要计算平均,所以我们要用出现的总次数除以出现的文档数。最后经过map之后由于我们得到的还是一个二元组,我们不能直接collect,需要用collectAsMap。

我们把上面这个例子用图来展示,会很容易理解:

连接操作

在spark当中,除了基础的转化操作之外,spark还提供了额外的连接操作给pair RDD。通过连接,我们可以很方便地像是操作集合一样操作RDD。操作的方法也非常简单,和SQL当中操作数据表的形式很像,就是join操作。join操作又可以分为join(inner join)、left join和right join。

如果你熟悉SQL的话,想必这三者的区别应该非常清楚,它和SQL当中的join是一样的。如果不熟悉也没有关系,解释起来并不复杂。在join的时候我们往往是用一张表去join另外一张表,就好像两个数相减,我们用一个数减去另外一个数一样。比如A.join(B),我们把A叫做左表,B叫做右表。所谓的join,就是把两张表当中某一个字段或者是某些字段值相同的行连接在一起。

比如一张表是学生表,一张表是出勤表。我们两张表用学生的id一关联,就得到了学生的出勤记录。但是既然是集合关联,就会出现数据关联不上的情况。比如某个学生没有出勤,或者是出勤表里记错了学生id。对于数据关联不上的情况,我们的处理方式有四种。第一种是全都丢弃,关联不上的数据就不要了。第二种是全部保留,关联不上的字段就记为NULL。第三种是左表关联不上的保留,右表丢弃。第四种是右表保留,左表丢弃。

下图展示了这四种join,非常形象。

我们看几个实际的例子来体会一下。

首先创建数据集:

ex1 = sc.parallelize([['frank'30], ['bob'9], ['silly'3]])
ex2 = sc.parallelize([['frank'80], ['bob'12], ['marry'22], ['frank'21], ['bob'22]])

接着,我们分别运行这四种join,观察一下join之后的结果。

从结果当中我们可以看到,如果两个数据集当中都存在多条key值相同的数据,spark会将它们两两相乘匹配在一起。

行动操作

最后,我们看下pair RDD的行动操作。pair RDD同样是rdd,所以普通rdd适用的行动操作,同样适用于pair rdd。但是除此之外,spark还为它开发了独有的行动操作。

countByKey

countByKey这个操作顾名思义就是根据Key值计算每个Key值出现的条数,它等价于count groupby的SQL语句。我们来看个具体的例子:

collectAsMap

这个也很好理解,其实就是讲最后的结果以map的形式输出

从返回的结果可以看到,输出的是一个dict类型。也就是Python当中的"map"。

lookup

这个单词看起来比较少见,其实它代表的是根据key值查找对应的value的意思。也就是常用的get函数,我们传入一个key值,会自动返回key值对应的所有的value。如果有多个value,则会返回list。

总结

到这里,所有的pair RDD相关的操作就算是介绍完了。pair rdd在我们日常的使用当中出现的频率非常高,利用它可以非常方便地实现一些比较复杂的操作。

另外,今天的这篇文章内容不少,想要完全吃透,需要一点功夫。这不是看一篇文章就可以实现的,但是也没有关系,我们初学的时候只需要对这些api和使用方法有一个大概的印象即可,具体的使用细节可以等用到的时候再去查阅相关的资料。

今天的文章就是这些,如果觉得有所收获,请顺手点个关注或者转发吧,你们的举手之劳对我来说很重要。