一、如何创建RDD

1、parallelizing an existing collection in your driver program:并行的数据集合中的元素通过驱动程序转化为RDD

SparkCode基础02

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

1)在SparkContext.scala文件中查找parallelize方法:

SparkCode基础02

2)在SparkShell中执行

SparkCode基础02

3)执行distData.collect

SparkCode基础02

SparkCode基础02

观察UI界面,看到task的数量是2。为什么?

3)distData.reduce((a, b) => a + b)

相加,因为没设定分区数,所以是默认的算法来分区。

4)分区数

SparkCode基础02

SparkCode基础02

所以Job2是5个Task

SparkCode基础02

Spark will run one task for each partition of the cluster. 

在Spark中,partition = task

3)Typically you want 2-4 partitions for each CPU in your cluster. 

你需要2-4个分区给每个CPU在你的集群上。这样不会产生过多的task浪费。

4)Spark tries to set the number of partitions automatically based on your cluster

Spark会根据你的集群来设定分区数,比如你的是HDFS部署的,他会根据文件大小跟block块来对比,进而设定分区数

5)调优点(以后说)

Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

2、referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

数据集通过外部系统(共享文件系统)比如HDFS,HBase或者其他支持Hadoop InpuFormat数据源转化

输入文件的URL,返回RDD(String)

1)Text File:

 * Read a text file from HDFS, a local file system (available on all nodes), or any

 * Hadoop-supported file system URI, and return it as an RDD of Strings.

本地系统的案例:

Standalone:

1master + 100Worker ==>wc inputsource(local)

必须要把输入的文件在所有的节点上。

测试:

外部文件系统(本地)转化为RDD:

val distFile = sc.textFile("/opt/data/ruozeinput.txt")

SparkCode基础02求长度:

distFile.map(s => s.length).reduce((a, b) => a + b)

外部文件系统(HDFS)转化为RDD:

SparkCode基础02

val distFile = sc.textFile("hdfs://hadoop002:9000/ruozeinput.txt")

SparkCode基础02

3、Some notes on reading files with Spark:

1)If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes.

如果是使用本地的文件系统,你必须在所有节点都上传这个文件路径

2)All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well.

所有的Spark文件基于input方法,包括textFile,目录路径,compressed files,通配符

val distFile = sc.textFile("hdfs://hadoop002:9000/data/")

3)The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

4、Apart from text files, Spark’s Scala API also supports several other data formats:

1)SparkContext.wholeTextFiles

读取HDFS目录下的文件,会自动根据目录下不同文件名,生成不同的Array数组。具体格式:array<文件名,文件内容>

而TextFile方法是,目录下所有文件都放在一个Array里。

2)SequenceFile 

et an RDD for a Hadoop SequenceFile with given key and value types.

作业:Spark读取SequenceFile文件

3)For other Hadoop InputFormats

SparkContext.hadoopRDD method

4)RDD保存

RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

二、RDD的操作

RDDs support two types of operations: 

1、transformations

RDDA =======> RDDB 两者不是一个RDD

比如:map方法来transfomations(转换)

RDDA(1,2,3,4)    map(+1)    RDDB(2,3,4,5)         reduce(a + b)

所有的Spark的transformations都是lazy加载的。

hey just remember the transformations applied to some base dataset (e.g. a file).The transformations are only computed when an action requires a result to be returned to the driver program. 

lazy:用到的时候才加载(懒加载)

RDDA.map().filter().map().filter()

2、actions

会返回值

reduce

比如:

RDDA.map().reduce(_ + _) 返回一个结果,比MapReduce更有效率

3、cache(持久化)

三、RDD的Transformations

SparkCode基础02

1、map:对RDD中的每个元素都执行一个fun

parallelize():将一对存在的集合转换成RDD

val a =sc.parallelize(1 to 9)

SparkCode基础02

action:

SparkCode基础02

val b = a.map(x => x*2)

b.collect

SparkCode基础02

2、val a =sc.parallelize(List("dog","tiger","cat","panda"))

val b = a.map(x => (x,1))

SparkCode基础02

map的方法,实际上rdd api进行编程和scala进行编程,一样

====>Scala中的集合操作是一定要掌握的。

3、filter

对元素进行过滤

例子1:

val a = sc.parallelize(1 to 10)

a.filter( _ % 2 == 0).collect

a.filter(_ < 4).collect

SparkCode基础02

例子2:

第一步:map*2

第二步:filter > 5

val a = sc.parallelize(1 to 6)

val mapRdd = a.map( _ * 2)

mapRdd.collect

val filterRdd = mapRdd.filter( _ > 5)

链式编程:

sc.parallelize(1 to 6).map( _ * 2).filter( _ > 5).collect


作业:

map和filtmap的区别

4、mapValues(工作中非常常用的小技巧)

key不动,只动value

例子1:
val a = sc.parallelize(List("dog","tiger","lion","cat","panda"))

val b = a.map( x =>(x.length,x))

SparkCode基础02

b.mapValues("x" + _ + "x").collect

SparkCode基础02

5、count

Return the number of elements in the dataset.

a.count.collecrt

例子1:

SparkCode基础02

6、reduce(func)

 Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

a.reduce(_ + _) = a.reduce((x,y) => x + y)

SparkCode基础02

7、first

Return the first element of the dataset (similar to take(1)).

a.first

SparkCode基础02

8、.top

例子1:

取元素中最大的两个

val a = sc.parallelize(Array(6,9,4,7,5,8).top(2)

例子2:

val a = sc.parallelize(List("dog","tiger","lion","cat","panda"))

a.top(2)

SparkCode基础02

例子3:

升序:(自定义排序的扩展)

implicit val myOrder = implicitly[Ordering[Int]].reverse

sc.parallelize(Array(6,9,4,7,5,8)).top(2)

SparkCode基础02

9、.max  .min

SparkCode基础02

SparkCode基础02

作业:

takeSample和takeOrdered

四、SparkRdd回顾

1、SparkRdd的工作流程

SparkCode基础02

2、JOIN在Spark CORE中的使用

val a  = sc.parallelize(Array(("A","a1"),("C","c1"),("D","d1"),("F","f1"),("F","f2")))

val b  = sc.parallelize(Array(("A","a2"),("C","c2"),("D","d2"),("F","f2"),("F","f3")))

a.join(b).collect = inner join 返回左右都匹配上的

SparkCode基础02

例子1:

a.leftOuterJoin(b).collect

SparkCode基础02

发现一个情况:left为基准,左边的全部,右边的关联

例子2:

a.rightOuterJoin(b).collect

SparkCode基础02

作业:

自己测试fullOuterJoin

3、Spark进行workcount

val log = sc.textFile("/opt/data/ruozeinput.txt")

val splits = log.flatMap(x => x.split("\t"))

val wordone = splits.map(x => (x,1))

wordone.reduceByKey(_ + _)

总结:
map:每行做map

flatMap:每一行压缩成每个元素之后根据条件做map

4、其他算子

.subtract()

在a里面,不在b里面

val a = sc.parallelize(1 to 5)

val b = sc.parallelize(2 to 3)

a.subtract(b)

5、intersection

.intersection():交集

6、cartesian

.cartesion():a和b的元素,一一组对

SparkCode基础02

Spark-shell做的测试:适用于测试

开发:IDEA + Maven + Scala

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

相关文章: