RDD编程
RDD基础
RDD,分布式对象集合。每个RDD被分为多个分区,这些分区运行在集群的不同节点上。
创建RDD
- 从外部读取一个数据集。
lines = sc.textFile('file:///G:\spark\README.md') # file表示从本地读取文件,默认为HDFS读取。
- 在驱动器程序中分发驱动器程序中的对象集合。
lines = sc.parallelize(['pandas', 'I like pandas'])
RDD操作
RDD支持两种操作:转化和行动。
转化操作(transformation)
RDD的转化操作是返回新的RDD的操作。
lines = sc.textFile('file:///G:\spark\README.md')
pythonlines = lines.filter(lambda line: "Python" in line)
上面代码中的filter即为转化操作,生成新的pythonlines。
但所有的转化操作,都只会在遇见第一次行动操作后,才会真正执行。这种数据处理方式为:惰性求值。
Spark会使用谱系图记录不同RDD之间的依赖关系,按照这些依赖关系来计算或恢复RDD。
行动操作(action)
行动操作是对数据集进行实际计算的操作。通过行动操作会把最终求得的结果返回驱动器程序或者写入外部存储系统中。
print('lines has' + lines.count() +'concerning lines')
print(pythonlines.first() + 'is thr first line in pythonlines')
上述代码中count和first都数据行动操作。通过行动操作,lines读取和pythonlines的生成才会真正执行。
惰性求值
前文提到,RDD的转化操作均为惰性求值。
惰性求值意味着:当对RDD进行转化操作时,操作不会立即执行,二是在内部记录下所要执行的操作的信息,生成谱系图。
换个角度看,RDD可以看作是通过转化操作构建出来的,记录如何计算数据的指令列表。 而不是存放着特定数据的数据集。
向Spark传递函数
范例:
word = rdd.filter(lambda s: "Python" in line)
def containsPython
return "Python" in line
word = rdd.filter(containsPython)
特别注意
下列传递函数方法容易造成意想不到的开销
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMatchesFunctions(self, rdd):
# 问题:在"self.isMatch"中引入了整个self
return rdd.filter(self.isMatch)
def getMatchesMemberReference(self, rdd):
# 问题: 在"self.query"中引入了整个self
return rdd.filter(lambda x: self.query in x)
当传递的对象是某个对象的成员,或包含了对某个对象中一个字段的引用时,Spark会把整个对象发送到节点,这可能会导致传递的东西远超预想。
修改方案:只把需要的字段从对象中拿出放在一个局部变量,然后传递该局部变量
class WordFunctions(object):
....
def getMatchesNoReference(self, rdd):
query = self.query
return rdd.filter(lambda x: query in x)
常见的转化操作和行动操作
基本RDD
接收任意数据类型的RDD
- 针对各个元素的转化操作
| 函数名 | 功能说明 |
|---|---|
| map | map接收一个函数,将函数作用于RDD中每个元素,将函数返回结果作为结果RDD中对应元素的值 |
| filter | 接收一个函数,将RDD中满足该函数的元素放入新的RDD中返回 |
| flatMap | RDD中的单个元素生成多个元素,并生成RDD,而这个RDD的所有元素可以被迭代器访问。 |
# flatMap()使用示例:分词
lines = sc.textFile(['hello world', 'hi'])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # 返回hello
- map和flatMap区别
- 伪集合操作
| 函数名 | 功能说明 |
|---|---|
| union | 返回一个包含两个RDD所有元素的RDD,重复的元素不会被删除 |
| intersection | 返回两个RDD中都存在的元素(即求交集) |
| subtract | 接收另一个RDD作为参数,返回一个只存在于第一个RDD而不存在于接收RDD中的元素组成的RDD。 |
| distinct | 接收一个RDD,去重 |
| cartesian | 求两个RDD的笛卡尔积 |
3. 行动操作
| 函数名 | 说明 |
|---|---|
| RDD.reduce(func) | reduce接收一个函数操作,对RDD中的所有同一类型元素执行该函数。 |
| RDD.fold(value, func) | fold接收一个起始值value,和一个函数,对RDD每个元素执行该函数,起始值为value,在value基础上进行操作。 |
| aggregate(value, func1, fun2) | aggregate接收一个期待返回的value,func1是对RDD中元素进行相应操作的函数,考虑到每个节点是在本地进行累加,提供第二个func2将累加器两两合并。 |
其他行动操作
持久化(缓存)
当需要多次使用同一个RDD时,如果每次都对RDD调用行动操作,Spark会重复计算RDD及其依赖。
避免多次计算同一个RDD,可以让Spark对数据进行持久化。
Python中会始终序列化要持久存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM堆空间中。而将数据写到磁盘或者堆外存储时,也是使用序列化后的数据。
缓存级别
RDD还有一种叫做unpersist()的方法,可以将RDD从缓存中移除。