MissDu

Spark入门教程(Python版)

教材官网

http://dblab.xmu.edu.cn/post/spark-python/ 

电子教材:

http://dblab.xmu.edu.cn/blog/1709-2/  

授课视频

https://study.163.com/course/introduction/1209408816.htm

软件下载

链接: https://pan.baidu.com/s/1dzf4RdWBmdnIiOGwjpOuow 提取码: r5b2  

Python入门教程 

http://dblab.xmu.edu.cn/blog/python/

 

一、spark的安装与使用

http://dblab.xmu.edu.cn/blog/1307-2/ 

 

开始安装之前确保
java –version start-dfs.sh jps 下载spark-2.4.0-bin-without-hadoop.tgz 放到 \home\Hadoop\下载 (~ \下载 )

  

 操作命令:

1.下载解压权限

cd 下载
ls
sudo tar -zxf spark-2.4.0-bin-without-hadoop.tgz -C /usr/local

cd /usr/local

ls

sudo mv spark-2.4.0-bin-without-hadoop/ ./spark

ls -l 

sudo chown -R hadoop ./spark 

2.配置文件

cd spark/ 

/usr/local/spark$ cp ./conf/spark-env.sh.template ./conf/spark-env.sh 

gedit ./conf/spark-env.sh

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

3.配置环境变量及生效

gedit ~/.bashrc 

export SPARK_HOME=/usr/local/spark
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:PYTHONPATH
export PYSPARK_PYTHON=python3
export PATH=$PATH:$SPARK_HOME/bin

 之前的 

export FLUME_HOME=/usr/local/flume                   
export FLUME_CONF_DIR=$FLUME_HOME/conf

export JAVA_HOME=/usr/lib/jvm/default-java
export HADOOP_HOME=/usr/local/hadoop
export HABSE_HOME=/usr/local/hbase
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
export PATH=$PATH:/usr/local/hbase/bin
export PATH=$PATH:$FLUME_HOME/bin:$HADOOP_HOME:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

export STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar

export SQOOP_HOME=/usr/local/sqoop
export PATH=$PATH:$SBT_HOME/bin:$SQOOP_HOME/bin
export CLASSPATH=$CLASSPATH:$SQOOP_HOME/lib

source ~/.bashrc

$SPARK_HOME/

4.运行测试

 ./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

Pi is roughly 3.1359356796783984 

5.交互式命令行

pyspark

>>>5+9*2
23
>>> \'201806120001\'+\'xiaoming\'
\'201806120001xiaoming\'
>>> a=\'xiaoming\'
>>> a
\'xiaoming\'
>>> b=\'{} 2018001260 {}\'.format(a,a) 
>>> b
\'xiaoming 2018001260 xiaoming\'
>>> b.split()
[\'xiaoming\', \'2018001260\', \'xiaoming\']
>>> exit()

 

SparkContext

>>> sc
<pyspark.context.SparkContext object at 0x7f2bce403dd8>

 

一句代码实现WordCount  

>>> sc.textFile("file:///home/hadoop/my.txt").flatMap(lambda line: line.split(" ")).map(lambda word : (word,1)).reduceByKey(lambda x,y : x+y).saveAsTextFile("file:///home/hadoop/myout")

sc.textFile(in_url).flatMap(lambda line: line.split(" ")).map(lambda word : (word.lower(),1)).reduceByKey(lambda a,b : a+b) 

编程示例

 

from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
logFile = "file:///usr/local/spark/README.md"
logData = sc.textFile(logFile, 2).cache()
numAs = logData.filter(lambda line: \'a\' in line).count()
numBs = logData.filter(lambda line: \'b\' in line).count()
print(\'Lines with a: %s, Lines with b: %s\' % (numAs, numBs))

 

$ cd /usr/local/spark/mycode/python
$ python3 WordCount.py

 

 

 

 6.python基本语法

 http://dblab.xmu.edu.cn/blog/python/

https://www.runoob.com/python3/python3-tutorial.html

 

path=\'/home/hadoop/wc/f1.txt\'
with open(path) as f:
    text=f.read()
words = text.split()
wc={}
for word in words:
    wc[word]=wc.get(word,0)+1
wclist=list(wc.items())
wclist.sort(key=lambda x:x[1],reverse=True)
print(wclist)

 

 

 

7.预备实验

  • Linux系统的安装

http://dblab.xmu.edu.cn/blog/285/

  • 在Windows中使用VirtualBox安装Ubuntu

http://dblab.xmu.edu.cn/blog/337-2/

  • Linux系统的常用命令

http://dblab.xmu.edu.cn/blog/1624-2/

  • 在Windows系统中利用FTP软件向Ubuntu系统上传文件

http://dblab.xmu.edu.cn/blog/1608-2/

  • Linux系统中下载安装文件和解压缩方法

http://dblab.xmu.edu.cn/blog/1606-2/

  • Linux系统中vim编辑器的安装和使用方法

http://dblab.xmu.edu.cn/blog/1607-2/

  • Hadoop的安装和使用

http://dblab.xmu.edu.cn/blog/install-hadoop/

 

8.使用Pycharm开发Spark应用程序

 

http://dblab.xmu.edu.cn/blog/2295/

 

 

 二、Spark RDD编程 

本地文件加载数据

pyspark

>>> url="file:///home/hadoop/hive_hql25.txt"
>>> lines=sc.textFile(url)
>>> lines
file:///home/hadoop/hive_hql25.txt MapPartitionsRDD[19] at textFile at NativeMethodAccessorImpl.java:0


>>> lines.count()
8
>>> lines.first()
\'select count(*) from bigdata_user;\'
>>> lines.foreach(print)

  

 HDFS文件加载数据


hadoop@dblab-VirtualBox:~$ $JAVA_HOME
bash: /usr/lib/jvm/default-java: 是一个目录
hadoop@dblab-VirtualBox:~$ java -version
openjdk version "1.8.0_275"


start-dfs.sh jps hdfs dfs -ls input
>>> url = \'/user/hadoop/input/1342-0.txt\' >>> lines=sc.textFile(url) >>> lines.first() \'\' >>> lines.count() 14594 >>>
stop-dfs.sh

 RDD操作 filter

#本地文件数据RDD

>>> \'sel\' in \'select count(*)\' True >>> a=[1,2,3] >>> lambda i:a[i]*2 <function <lambda> at 0x7f438e962620> >>> b=lambda i:a[i]*2 >>> b <function <lambda> at 0x7f438c09e950> >>> b(2) 6 >>> lineSelect=lines.filter(lambda line:\'select\' in line) >>> lineSelect.count() 4

 

RDD操作map

#
>>> data=[19,20,21] >>> rdd1=sc.parallelize(data) >>> rdd1 ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:475 >>> rdd1.count() 3 >>> rdd1.foreach(print) 19 20 21 >>> rdd2=rdd1.map(lambda i:x+2000) >>> rdd2 PythonRDD[4] at RDD at PythonRDD.scala:48 >>> rdd2.foreach(print)
2019
2020
2021


RDD操作 reduceByKey

>>> a=[\'a\',\'b\',\'a\']
>>> b=[1,1,1]
>>> c=zip(a,b)
>>> c
<zip object at 0x7fdbdd7034c8>
>>> d=sc.parallelize(c)
>>> d.foreach(print)
(\'a\', 1)
(\'b\', 1)
(\'a\', 1)
>>> e=d.reduceByKey(lambda a,b:a+b)
>>> e.foreach(print)
(\'a\', 2)
(\'b\', 1)

 词频统计

>>> in_url = \'file:///home/hadoop/my.txt\'
in_url=\'hdfs://localhost:9000/user/hadoop/my.txt\'
>>> lines=sc.textFile(in_url)
>>> lines
file:///home/hadoop/my.txt MapPartitionsRDD[50] at textFile at NativeMethodAccessorImpl.java:0
>>> lines.first()
\'export SPARK_HOME=/usr/local/spark\'
>>> lines.count()
>>> lines.collect()
>>> words=lines.flatMap(lambda line:line.split()) 
>>> words.collect()
[\'export\', \'SPARK_HOME=/usr/local/spark\', \'export\', \'PYTHONPATH=$SPARK_HOME/python\', \'export\', \'PYSPARK_PYTHON=python3\', \'export\', \'PATH=$PATH:$SPARK_HOME/bin\']
>>> words=words.flatMap(lambda line:line.split(\'=\')) >>> words.collect() [\'export\', \'SPARK_HOME\', \'/usr/local/spark\', \'export\', \'PYTHONPATH\', \'$SPARK_HOME/python\', \'export\', \'PYSPARK_PYTHON\', \'python3\', \'export\', \'PATH\', \'$PATH:$SPARK_HOME/bin\']
>>> words=words.flatMap(lambda line:line.split(\'/\')) >>> words.collect() [\'export\', \'SPARK_HOME\', \'\', \'usr\', \'local\', \'spark\', \'export\', \'PYTHONPATH\', \'$SPARK_HOME\', \'python\', \'export\', \'PYSPARK_PYTHON\', \'python3\', \'export\', \'PATH\', \'$PATH:$SPARK_HOME\', \'bin\']
>>> word=words.map(lambda word:(word,1)) >>> word.collect() [(\'export\', 1), (\'SPARK_HOME\', 1), (\'\', 1), (\'usr\', 1), (\'local\', 1), (\'spark\', 1), (\'export\', 1), (\'PYTHONPATH\', 1), (\'$SPARK_HOME\', 1), (\'python\', 1), (\'export\', 1), (\'PYSPARK_PYTHON\', 1), (\'python3\', 1), (\'export\', 1), (\'PATH\', 1), (\'$PATH:$SPARK_HOME\', 1), (\'bin\', 1)]

>>> wc=word.reduceByKey(lambda a,b:a+b) >>> wc.collect() [(\'\', 1), (\'python\', 1), (\'usr\', 1), (\'python3\', 1), (\'PATH\', 1), (\'PYTHONPATH\', 1), (\'bin\', 1), (\'export\', 4), (\'local\', 1), (\'spark\', 1), (\'SPARK_HOME\', 1), (\'$PATH:$SPARK_HOME\', 1), (\'PYSPARK_PYTHON\', 1), (\'$SPARK_HOME\', 1)]

>>> out_url=\'file:///home/hadoop/myout/0316\'
>>> wc.saveAsTextFile(out_url)

>>>

>>> out_url=\'myout\'
>>> wcsort.saveAsTextFile(out_url)
>>> exit()

hdfs dfs -ls

hdfs dfs -ls myout

hdfs dfs -cat myout/part-00000 | head -5

>>> wc=sc.textFile(in_url).flatMap(lambda line: line.split(" ")).map(lambda word : (word.lower(),1)).reduceByKey(lambda a,b : a+b)

>>> wc.count()

>>> wc.collect()
>>> wc.cache()


>>> wcsort=wc.sortByKey()
>>> wcsort.collect()

>>> wcsort=wc.sortByKey()
>>> wcsort.collect()

>>> wcsort=wc.sortBy(lambda x:x[1],False)

>>> wcsort.take(5)

 


学生课程分数案例

count

url=\'file:///home/hadoop/chapter4-data01.txt\'
lines=sc.textFile(url)
lines.take(3)

name=lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0],(line[1],line[2])))
name.take(3)

name.countByKey() #kvRDD

name.countByValue()

lines.map(lambda line:line.split(\',\')).take(5)
lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0])).take(5)
lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0])).countByValue()

lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0])).take(5)
lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0])).distinct().take(5)
lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0])).distinct().count()

  

groupBy

groupByName=lines.map(lambda line:line.split(\',\')).map(lambda line:(line[0],(line[1],line[2]))).groupByKey()
groupByName

groupByName.collect()[10]
for i in groupByName.collect()[10][1]:
    print(i)


reduceByKey
course=lines.map(lambda line:line.split(\',\')).map(lambda line:(line[1],1))
course.take(3)

course.reduceByKey(lambda a,b:a+b).collect()

 

Tom

tomRDD=lines.filter(lambda line: \'Tom\' in line).map(lambda line: line.split(\',\'))
tomRDD.collect()

tomRDD.sortBy(lambda x:x[2],False).collect()

 

from numpy import mean
tomList=lines.map(lambda line: line.split(\',\')).map(lambda line:(line[0],line[2])).lookup(\'Tom\')
mean([int(x) for x in tomList])

 

combineByKey 课程,人数,平均分

course=lines.map(lambda line:line.split(\',\')).map(lambda line:(line[1],line[2]))
course.first()

courseC=course.combineByKey(lambda v: (int(v),1), lambda c,v:(c[0]+int(v),c[1]+1), lambda c1,c2:(c1[0]+c2[0],c1[1]+c2[1]))
courseC.first()

 

courseC.map(lambda x: (x[0], x[1][1], x[1][0]/x[1][1])).collect() 

 

 

可视化

#(单词,词频)的列表
from
pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("SparkReadme") sc = SparkContext(conf = conf) url=\'input/1342-0.txt‘ with open(\'/home/hadoop/stopwords.txt\') as f: stops=f.read().split() wc = sc.textFile(url).flatMap(lambda line: line.lower().replace(\',\',\'\').split()).filter(lambda word: word not in stops).filter(lambda word:len(word)>2).map(lambda word : (word,1)).reduceByKey(lambda a,b : a+b).sortBy(lambda x:x[1],False).take(100)
# 词云:
from pyecharts.charts import WordCloud mywordcloud = WordCloud() mywordcloud.add(\'\',wc, shape=\'circle\') mywordcloud.render()

 

条形图

from pyecharts.charts import Bar
bar = Bar()
bar.add_xaxis(cs.keys().collect())
bar.add_yaxis(\'avg\',cs.map(lambda x:x[2]).collect())
bar.render()

 

条形图配置  

from pyecharts.charts import Bar
from pyecharts import options as opts
from pyecharts.globals import ThemeType

bar = Bar(init_opts=opts.InitOpts(theme=ThemeType.PURPLE_PASSION))
bar.add_xaxis(cs.keys().collect())
bar.add_yaxis(\'rs\',cs.map(lambda x:x[1]).collect())
bar.add_yaxis(\'avg\',cs.map(lambda x:x[2]).collect())

bar.set_global_opts(title_opts=opts.TitleOpts(title="课程", subtitle="选修人数,平均分"),
                    xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-15)),
yaxis_opts=opts.AxisOpts(max_=150))
bar.render("cs_rs_avg.html")

 

选项

https://pyecharts.org/#/zh-cn/global_options

主题

https://pyecharts.org/#/zh-cn/themes

示例

https://gallery.pyecharts.org/#/Bar/README

 

 

三、Spark SQL

 

四、综合实践

 

分类:

技术点:

相关文章: