pyspark简单实践与原理

3,147 阅读15分钟
原文链接: yuerblog.cc

本文记录pyspark的基础用法和重要理解,希望对大家也很有帮助。

安装

因为我们是学习用途,所以不需要专门下载spark服务端进行搭建。

通过直接为python安装pyspark包即可,它内涵了与spark通讯的SDK以及单机spark模拟环境:

pip3 install pyspark

如果你有多个python环境,那么需要注意配置一下环境变量,告知spark服务端(在这里是内置的服务端)使用正确的python版本:

export PYSPARK_PYTHON=/usr/local/bin/python3

基本概念

RDD

代表一个数据集合,我们写spark程序时并不会加载到我们本地,它只是一个逻辑概念。

spark可以从本地磁盘、HDFS等环境中生成一个RDD,我们认为此时RDD对象只是一个标识,并不是真的要去把HDFS文件读到哪里。

2种算子

写spark程序会用到2类函数,称为2种算子:

  • transformation:例如map、filter等,调用时并不会直接触发集群计算,此时客户端只是在构建你的计算拓扑关系,形成一个多轮迭代的调用树。
  • action:例如save、collect、count、sum等,调用时会触发真正的集群计算,计算的拓扑是此前若干transformation调用绘制而来的。

缓存cache

spark直到action调用时才会进行整个拓扑的调度计算。

spark RDD的核心原理是不可变性,而我们知道RDD作为分布式数据集一般规模很大,所以spark并不是我们想象的那样先计算出第一轮transformation的输出RDD保存起来,然后再作为第二轮transformation的输入,这样会占用大量的内存资源,基本不可能实现。

简单的说,spark是流式计算,RDD的计算在不同的算子之间流式的传输,并不是先算完前一轮再计算下一轮。

因此,如果我们的某个RDD需要在多个计算分支里被重复用到,那么就可以考虑主动要求RDD缓存。一旦被要求缓存,那么当RDD被第一次计算的过程中,就会把RDD内的记录真正的保存到内存/磁盘上,以便其他计算分支复用。

所以,大家注意cache应该在action之前被调用,这样在真正计算时会顺带帮我们把RDD缓存在某个地方。

故障恢复

我感觉cache很容易与spark的故障恢复混淆。

spark在迭代我们的transformation计算拓扑时,如果中途机器异常等原因导致某一轮RDD损坏,那么spark就从最近一个上游的RDD重新计算出被损坏的RDD,从而成恢复工作。

我们使用cache并不是为了帮助spark故障恢复,故障恢复是spark内置的能力,使用cache的唯一理由应该是在多个计算分支里复用结果集。

检查点checkpoint

个人理解是一个更可靠的缓存,依旧是为了复用RDD。

因为cache是通过内存与磁盘混合的模式缓存RDD数据,但是一旦个别机器故障还是得重算。

因此,spark可以调用checkpoint来让RDD直接保存到HDFS上,依靠HFDS保障高可靠性。

checkpoint也是一个transformation,不会调用时立即执行,但是checkpoint默认会重算一次RDD并输出HDFS。

spark官方建议,如果要用checkpoint,建议配合cache使用,这样当action执行的时候,会先将结果写入cache,然后checkpoint会直接再把cache的内容写入HDFS,避免了checkpoint二次计算RDD的损耗。

partition分区

如果RDD加载的是一个HDFS文件,那么计算一个大文件时就需要考虑并行化。

因此,RDD需要打散,也就是分区partition,这样每个partition可以并行计算。

客户端允许我们指定RDD分成几个partition,例如HDFS这种分布式文件系统底层是block分块存储的,本身就能支持数据的切割。

shuffle打散

搞过map-reduce的同学应该了解,shuffle是map与reduce之间的一个数据聚合、排序、传输过程。

对spark的多个transformation算子之间也是一样的,每个partition计算后的结果如果不能继续在partition内完成下一轮计算,那么就需要shuffle传输到其他计算节点做进一步聚合(例如map操作)。

在编写spark的时候,需要特别注意减少shuffle的次数,减少shuffle的传输量,尽量在当前partition内完成多轮计算,尽量将需要shuffle的量级缩小。

实践

测试数据

我有一个本地文件作为最初的RDD输入,其内容如下:

cat input.txt
a b c
d a e
f g h c b

初始化框架

# -*- coding: utf-8 -*-
 
## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext, StorageLevel
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    pass
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

  • SparkConf:配置客户端,主要是服务端地址(local表示模拟spark集群模式)和本应用的名字
  • SparkContext:传入配置,是我们操作spark的入口对象

我们接下来要写的代码都放在一个叫做main的函数里(不是spark要求的),目前还是一个空函数,也就是没有任何计算任务。

广播变量

# -*- coding: utf-8 -*-
 
## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext, StorageLevel
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

spark支持将一些共享的配置类信息分发到集群中,通过使用broadcast广播变量即可实现。

它的参数是任意python对象,返回值是我们后续访问的操作标识,后面会演示如何在计算代码中访问广播变量。

全局计数变量

## Closure Functions
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)

通过accumulator可以制造一个全局共享计数的变量,可以用于分布式累加计数的用途。

我的目标是统计不同单词的数量,后续我们会看到具体的用法。

创建RDD

我们指定从本地磁盘创建RDD,注意这个操作并不会立即读取文件,仅仅是一个标识而已。

## Closure Functions
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)

执行flatMap

我要声明第一个transformation算子,把文件中每一行按空格分词,产生一个(单词,1)的RDD集合,表示单词出现了1次。

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
    
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))

flatMap要求处理方法返回的是数组,也就是从1行变成N个单词。

我们通过rdd的toDebugString可以调试rdd,观察它的拓扑关系:

(8) PythonRDD[2] at RDD at PythonRDD.scala:49 []
 |  /Users/liangdong/Documents/github/spark/py-demo/input.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  /Users/liangdong/Documents/github/spark/py-demo/input.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

可见,当前的RDD叫做MapPartitionRDD,来自于HadoopRDD,也就是我们上一轮textFile创建的RDD。

缓存RDD

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存

调用cache可以缓存该rdd,因为我后续会有2个计算分支都用到这个集合,为了复用避免可能的重复计算,我显式的指定cache。

cache是纯内存缓存,底层利用persist方法实现,我们可以直接调用persist指定其他的缓存模式,避免内存不够用的情况出现,在此不做演示。

执行count

我们知道count是action算子,立即会发起分布式计算。

## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)

我们打印出了整个文件中,总共有多少个单词。

总单词数量: 11

reduce聚合

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
def count(left_val, right_val):
    return left_val + right_val
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)
 
    # 在每个partition内做聚合
    per_word_count = words_rdd.reduceByKey(count, 4)
 
    print("单词统计:", per_word_count.collect())

按words_rdd中第一列作为key聚合,并进行reduce累计,可以得到每个单词的出现次数。

单词统计: [('b', 2), ('c', 2), ('g', 1), ('a', 2), ('e', 1), ('d', 1), ('h', 1), ('f', 1)]

reduceByKey类似于map-reduce中的map+reduce,spark会在mapper端进行一次combine,在reducer端再进行一次merge,也就是默认会对shuffle做优化。

绝大多数transformation算子,都可以传入一个numParitions的参数,即经过计算后产生的RDD应该使用几个分区,我们应该特别关注这个事情。

另外特别说明,正是因为我们执行了collect算子,reduceByKey的计算才得以进行,否则我们仍旧是在拓扑的描述阶段。

同时,我们复用到了被cache的words_rdd,这对于大数据量计算特别有意义。

访问共享数据

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
def count(left_val, right_val):
    return left_val + right_val
 
def addUnit(item):
    all_words_count.add(1)
 
    return (item[0], "{}{}".format(item[1], broadcast_vals.value["unit"]))
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    global broadcast_vals
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    global all_words_count
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)
 
    # 在每个partition内做聚合
    per_word_count = words_rdd.reduceByKey(count, 4)
 
    print("单词统计:", per_word_count.collect())
 
    # 给统计结果增加"计数单位"
    detail_word_count = per_word_count.map(addUnit)
 
    print("详细统计:", detail_word_count.collect())
 
    print("去重单词数量:", all_words_count.value)

我们接着对per_word_count进行一次map操作。

map是对RDD中每个元素执行一个计算函数,返回值替换了原有的元素。

我们在addUnit函数中,访问broadcast变量中的Unit,将其追加到单词的出现次数后面。另外,我们基于accumulator变量,累计了不同单词的出现次数。

单词统计: [('b', 2), ('c', 2), ('g', 1), ('a', 2), ('e', 1), ('d', 1), ('h', 1), ('f', 1)]
详细统计: [('b', '2次'), ('c', '2次'), ('g', '1次'), ('a', '2次'), ('e', '1次'), ('d', '1次'), ('h', '1次'), ('f', '1次')]
去重单词数量: 8

排序并输出到文件

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
def count(left_val, right_val):
    return left_val + right_val
 
def addUnit(item):
    all_words_count.add(1)
 
    return (item[0], "{}{}".format(item[1], broadcast_vals.value["unit"]))
 
def sort_key(item):
    return item[1]
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    global broadcast_vals
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    global all_words_count
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)
 
    # 在每个partition内做聚合
    per_word_count = words_rdd.reduceByKey(count, 4)
 
    print("单词统计:", per_word_count.collect())
 
    # 给统计结果增加"计数单位"
    detail_word_count = per_word_count.map(addUnit)
 
    print("详细统计:", detail_word_count.collect())
 
    print("去重单词数量:", all_words_count.value)
 
    # 按出现次数排序
    sorted_rdd = per_word_count.sortBy(sort_key, False)
 
    # 写到文件中
    sorted_rdd.saveAsTextFile("/Users/liangdong/Documents/github/spark/py-demo/output")

现在,我们调用sortBy来对per_word_count做排序,第一个参数返回要排序的字段依据,第二个参数指定降序。

最后我们把集合写到文件中,一般来说是HDFS上的一个文件路径。

output/
total 16
-rw-r--r--  1 liangdong  staff   0  7  3 13:11 _SUCCESS
-rw-r--r--  1 liangdong  staff   0  7  3 13:11 part-00000
-rw-r--r--  1 liangdong  staff  27  7  3 13:11 part-00001
-rw-r--r--  1 liangdong  staff   0  7  3 13:11 part-00002
-rw-r--r--  1 liangdong  staff  45  7  3 13:11 part-00003
liangdongs-MacBook-Pro:py-demo liangdong$ cat output/part-00001
('b', 2)
('c', 2)
('a', 2)
liangdongs-MacBook-Pro:py-demo liangdong$ cat output/part-00003
('g', 1)
('e', 1)
('d', 1)
('h', 1)
('f', 1)

总结

上述仅仅是spark的批量计算编程方法。

实际spark还支持streaming流式计算,后续有时间会简单的了解一下。

因为spark编程和开发单机程序没有什么两样,但是实际却是在分布式大数据集上的分布式并行计算,所以spark有很多Machine Leanring的库被开发出来,轻松帮你解决海量数据的机器学习问题。

另外,spark体现的编程思想值得一提,即:计算向存储移动。

因为java和python都支持对象的序列化,所以可以把计算代码序列化后发往存储节点参与分布式运算,这是为什么大多数大数据开源平台都仅支持jvm系语言的重要原因。

参考