因本人刚开始写博客,学识经验有限,如有不正之处望读者指正,不胜感激;也望借此平台留下学习笔记以温故而知新。这一篇文章主要是最近阅读的Spark快速大数据分析一书的简短笔记摘要,新手入门值得推荐。 

第一章

Spark Core 中包含了对弹性分布式数据集(resilient distributed dataset,简称RDD)的API 定义。RDD 表示分布在多个计算节点上可以并行操作的元素集合,是Spark 主要的编程抽象。

Spark 的各个组件

Spark快速大数据分析_笔记_1-5

第三章

在Spark 中,对数据的所有操作不外乎创建RDD、转化已有RDD 以及调用RDD 操作进行求值。

创建RDD 最简单的方式就是把程序中一个已有的集合传给SparkContext 的parallelize()方法。RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的RDD 的操作,比如map() 和filter(),而行动操作则是向驱动器程序返回结果或
把结果写入外部系统的操作,会触发实际的计算,比如count() 和first()。

通过转化操作,从已有的RDD 中派生出新的RDD,Spark 会使用谱系图(lineage graph)来记录这些不同RDD 之间的依赖关系。

Spark快速大数据分析_笔记_1-5

在Python 中,有三种方式来把函数传递给Spark。除了lambda 表达式,也可以传递顶层函数或是定义的局部函数。

看一个简单的例子,用map() 对RDD 中的所有数求平方

Spark快速大数据分析_笔记_1-5

flatMap() 的一个简单用途是把输入的字符串切分为单词

Spark快速大数据分析_笔记_1-5

union(other),会返回一个包含两个RDD 中所有元素的RDD。如果输入的RDD 中有重复数据,Spark 的union() 操作也会包含这些重复数据。

intersection(other) 方法,返回两个RDD 中都有的元素。

subtract(other) 函数接收另一个RDD 作为参数,返回一个由只存在于第一个RDD 中而不存在于第二个RDD 中的所有元素组成的RDD。

计算两个RDD 的笛卡儿积,cartesian(other) 转化操作会返回所有可能的(a, b) 对,其中a 是源RDD 中的元素,而b 则来自另一个RDD。

列出基本的RDD转化操作:

Spark快速大数据分析_笔记_1-5

列出针对两个RDD的转化操作:

Spark快速大数据分析_笔记_1-5

使用reduce(),可以很方便地计算出RDD中所有元素的总和、元素的个数,以及其他类型的聚合操作

Spark快速大数据分析_笔记_1-5

fold() 和reduce() 都要求函数的返回值类型需要和我们所操作的RDD 中的元素类型相同。aggregate() 函数则把我们从返回值类型必须与所操作的RDD 类型相同的限制中解放出来。

用aggregate() 来计算RDD 的平均值,

Spark快速大数据分析_笔记_1-5

take(n) 返回RDD 中的n 个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。

takeSample(withReplacement, num,seed) 函数可以让我们从数据中获取一个采样,并指定是否替换。

表3-4总结了这些行动操作。

Spark快速大数据分析_笔记_1-5

有些函数只能用于特定类型的RDD,比如mean() 和variance() 只能用在数值RDD 上,而join() 只能用在键值对RDD 上。

在Python 中,会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM 堆空间中。

RDD 还有一个方法叫作unpersist(),调用该方法可以手动把持久化的RDD 从缓存中移除。

第四章

键值对RDD 是Spark 中许多操作所需要的常见数据类型。本章介绍如何操作键值对RDD。键值对RDD 通常用来进行聚合计算。

构建键值对RDD 的方法

Spark快速大数据分析_笔记_1-5

当用Scala 和Python 从一个内存中的数据集创建pair RDD 时,只需要对这个由二元组组成的集合调用SparkContext.parallelize() 方法。

表4-1 和表4-2 总结了对pair RDD 的一些转化操作,表4-1:Pair RDD的转化操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

Spark快速大数据分析_笔记_1-5

Spark快速大数据分析_笔记_1-5

表4-2:针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})

Spark快速大数据分析_笔记_1-5

Spark快速大数据分析_笔记_1-5

可以拿前一节中的pair RDD,筛选掉长度超过20 个字符的行,

Spark快速大数据分析_笔记_1-5

只想访问pair RDD 的值部分,这时操作二元组很麻烦。由于这是一种常见的使用模式,因此Spark 提供了mapValues(func) 函数,功能类似于map{case (x, y): (x,func(y))}

使用reduceByKey() 和mapValues() 来计算每个键的对应值的均值

Spark快速大数据分析_笔记_1-5

Spark快速大数据分析_笔记_1-5

使用reduceByKey() 对所有的单词进行计数。

Spark快速大数据分析_笔记_1-5

事实上,可以对第一个RDD 使用countByValue() 函数,以更快地实现单词计数:input.flatMap(x => x.split(" ")).countByValue()。

combineByKey() 有多个参数分别对应聚合操作的各个阶段,因而非常适合用来解释聚合操作各个阶段的功能划分。

Spark快速大数据分析_笔记_1-5

每个RDD 都有固定数目的分区,分区数决定了在RDD 上执行操作时的并行度。

本章讨论的大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的RDD 的分区数

Spark快速大数据分析_笔记_1-5

连接数据可能是pair RDD 最常用的操作之一。连接方式多种多样:右外连接、左外连接、交叉连接以及内连接。

普通的join 操作符表示内连接。rightOuterJoin() 几乎与leftOuterJoin() 完全一样,只不过预期结果中的键必须出现在第二个RDD 中,而二元组中的可缺失的部分则来自于源RDD 而非第二个RDD。

将RDD 倒序排列,因此sortByKey() 函数接收一个叫作ascending 的参数,表示我们是否想要让结果按升序排序(默认值为true)。

Spark快速大数据分析_笔记_1-5

Pair RDD提供了一些额外的行动操作,可以让我们充分利用数据的键值对特性

表4-3:Pair RDD的行动操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

Spark快速大数据分析_笔记_1-5

和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制RDD 分区方式来减少通信开销

在Python 中,不能将HashPartitioner 对象传给partitionBy,而只需要把需要的分区数传递过去(例如rdd.partitionBy(100))。

这里列出了所有会为生成的结果RDD 设好分区方式的操作:cogroup()、groupWith()、join()、lef tOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、mapValues()(如果父RDD 有分区方式的话)、flatMapValues()(如果父RDD 有分区方式的话),以及filter()(如果父RDD 有分区方式的话)。其他所有的操作生成的结果都不会存在特定的分区方式。

PageRank 是一种从RDD 分区中获益的更复杂的算法,PageRank算法是以Google 的拉里· 佩吉(Larry Page)的名字命名的,用来根据外部文档指向一个文档的链接,对集合中每个文档的重要程度赋一个度量值。该算法可以用于对网页进行排序,当然,也可以用于排序科技文章或社交网络中有影响的用户。

算法会维护两个数据集:一个由(pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID, rank) 元素组成,包含每个页面的当前排序值。

给出了使用Spark 实现PageRank 的代码。

Spark快速大数据分析_笔记_1-5

为了最大化分区相关优化的潜在作用,你应该在无需改变元素的键时尽量使用mapValues() 或flatMapValues()。

在Python 中,不需要扩展Partitioner 类,而是把一个特定的哈希函数作为一个额外的参数传给RDD.partitionBy() 函数,

Spark快速大数据分析_笔记_1-5

第五章

Spark 支持很多种输入输出源

本章会介绍以下三类常见的数据源。

• 文件格式与文件系统:包括文本文件、JSON、SequenceFile,以及protocol buffer。

• Spark SQL中的结构化数据源

• 数据库与键值存储

表5-1:Spark支持的一些常见格式

Spark快速大数据分析_笔记_1-5

只需要使用文件路径作为参数调用SparkContext 中的textFile() 函数,就可以读取一个文本文件

Spark快速大数据分析_笔记_1-5

saveAsTextFile() 方法接收一个路径,并将RDD 中的内容都输入到路径对应的文件中

Spark快速大数据分析_笔记_1-5

读取JSON将数据作为文本文件读取,然后对JSON 数据进行解析,这样的方法可以在所有支持的编程语言中使用

Spark快速大数据分析_笔记_1-5

读取CSV

如果恰好你的CSV 的所有数据字段均没有包含换行符,可以使用textFile() 读取并解析数据

Spark快速大数据分析_笔记_1-5

Spark快速大数据分析_笔记_1-5

如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段

Spark快速大数据分析_笔记_1-5

使用的CSV 库要输出到文件或者输出器,所以可以使用StringWriter 或StringIO来将结果放到RDD 中

Spark快速大数据分析_笔记_1-5

读取SequenceFile

Spark快速大数据分析_笔记_1-5

保存SequenceFile

Spark快速大数据分析_笔记_1-5

对象文件在Python 中无法使用,不过Python 中的RDD 和SparkContext 支持saveAsPickleFile()和pickleFile() 方法作为替代

Spark 支持从本地文件系统中读取文件

Spark快速大数据分析_笔记_1-5

在Spark 中使用HDFS 只需要将输入输出路径指定为hdfs://master:port/path 就够了。

结构化数据指的是有结构信息的数据——也就是所有的数据记录都具有一致字段结构的集合。Spark SQL 支持多种结构化数据源作为输入,而且由于Spark SQL 知道数据的结构信息,它还可以从这些数据源中只读出所需字段。

Apache Hive 是Hadoop 上的一种常见的结构化数据源。Hive 可以在HDFS 内或者在其他存储系统上存储多种格式的表.。SparkSQL 可以读取Hive 支持的任何表。

Spark快速大数据分析_笔记_1-5

假设有一个包含推文的JSON 文件,格式如例5-33 所示

Spark快速大数据分析_笔记_1-5

读取这些数据

Spark快速大数据分析_笔记_1-5

Spark 可以从任何支持Java 数据库连接(JDBC) 的关系型数据库中读取数据, 包括MySQL、Postgre 等系统。要访问这些数据, 需要构建一个org.apache.spark.rdd.JdbcRDD,将SparkContext 和其他参数一起传给它。

 

 

 

相关文章: