yanshw

spark教程(八)-SparkSession

spark 有三大引擎,spark core、sparkSQL、sparkStreaming,

spark core 的关键抽象是 SparkContext、RDD;

SparkSQL 的关键抽象是 SparkSession、DataFrame;

sparkStreaming 的关键抽象是 StreamingContext、DStream

 

SparkSession 是 spark2.0 引入的概念,主要用在 sparkSQL 中,当然也可以用在其他场合,他可以代替 SparkContext;

SparkSession 其实是封装了 SQLContext 和 HiveContext

 

SQLContext

它是 sparkSQL 的入口点,sparkSQL 的应用必须创建一个 SQLContext 或者 HiveContext 的类实例

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, HiveContext

conf = SparkConf().setAppName(\'test\').setMaster(\'yarn\')
sc = SparkContext(conf=conf)

sqlc = SQLContext(sc)
print(dir(sqlc))
# \'cacheTable\', \'clearCache\', \'createDataFrame\', \'createExternalTable\', \'dropTempTable\', \'getConf\', \'getOrCreate\', \'newSession\', \'range\', \'read\', \'readStream\',
# \'registerDataFrameAsTable\', \'registerFunction\', \'registerJavaFunction\', \'setConf\', \'sparkSession\', \'sql\', \'streams\', \'table\', \'tableNames\', \'tables\', \'udf\', \'uncacheTable\'

### sqlcontext 读取数据也自动生成 df
data = sqlc.read.text(\'/usr/yanshw/test.txt\')
print(type(data))

 

HiveContext

它是 sparkSQL 的另一个入口点,它继承自 SQLContext,用于处理 hive 中的数据

 

HiveContext 对 SQLContext 进行了扩展,功能要强大的多

1. 它可以执行 HiveSQL 和 SQL 查询

2. 它可以操作 hive 数据,并且可以访问 HiveUDF

3. 它不一定需要 hive,在没有 hive 环境时也可以使用 HiveContext

 

注意,如果要处理 hive 数据,需要把 hive 的 hive-site.xml 文件放到 spark/conf 下,HiveContext 将从 hive-site.xml 中获取 hive 配置信息;

如果 HiveContext 没有找到 hive-site.xml,他会在当前目录下创建 spark-warehouse 和 metastore_db 两个文件夹

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, HiveContext

conf = SparkConf().setAppName(\'test\').setMaster(\'yarn\')
sc = SparkContext(conf=conf)
## 需要把 hive/conf/hive-site.xml 复制到 spark/conf 下
hivec = HiveContext(sc)
print(dir(hivec))
# \'cacheTable\', \'clearCache\', \'createDataFrame\', \'createExternalTable\', \'dropTempTable\', \'getConf\', \'getOrCreate\', \'newSession\', \'range\', \'read\', \'readStream\',\'refreshTable\',
# \'registerDataFrameAsTable\', \'registerFunction\', \'registerJavaFunction\', \'setConf\', \'sparkSession\', \'sql\', \'streams\', \'table\', \'tableNames\', \'tables\', \'udf\', \'uncacheTable\'

data = hivec.sql(\'\'\'select * from hive1101.person limit 2\'\'\')
print(type(data))

 

SparkSession

它实现了对二者的封装

SparkSession 的创建

class SparkSession(__builtin__.object):
     def __init__(self, sparkContext, jsparkSession=None):
     \'\'\' Creates a new SparkSession.
     |
     |      >>> from datetime import datetime
     |      >>> spark = SparkSession(sc)
     |      >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1,
     |      ...     b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
     |      ...     time=datetime(2014, 8, 1, 14, 1, 5))])
     |      >>> df = allTypes.toDF()
     |      >>> df.createOrReplaceTempView("allTypes")
     |      >>> spark.sql(\'select i+1, d+1, not b, list[1], dict["s"], time, row.a \'
     |      ...            \'from allTypes where b and i > 0\').collect()
     |      [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2,             dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
     |      >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
     |      [(1, u\'string\', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]\'\'\'

示例代码

from pyspark.sql import SparkSession


### method 1
sess = SparkSession.builder \
    .appName("aaa") \
    .config("spark.driver.extraClassPath", sparkClassPath) \
    .master("local") \
    .enableHiveSupport() \  # sparkSQL 连接 hive 时需要这句
    .getOrCreate()      # builder 方式必须有这句

### method 2
conf = SparkConf().setAppName(\'myapp1\').setMaster(\'local[4]\')   # 设定 appname 和 master
sess = SparkSession.builder.config(conf=conf).getOrCreate() # builder 方式必须有这句

### method 3
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(\'myapp1\').setMaster(\'local[4]\')   # 设定 appname 和 master
sc = SparkContext(conf=conf)
sess = SparkSession(sc)

 

文件数据源

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, HiveContext

conf = SparkConf().setAppName(\'test\').setMaster(\'yarn\')
sc = SparkContext(conf=conf)

#### 替代了 SQLContext 和 HiveContext,其实只是简单的封装,提供了统一的接口
spark = SparkSession(sc)
print(dir(spark))
# 很多属性,我把私有属性删了
# \'Builder\',\'builder\', \'catalog\', \'conf\', \'createDataFrame\', \'newSession\', \'range\', \'read\', \'readStream\',\'sparkContext\', \'sql\', \'stop\', \'streams\', \'table\', \'udf\', \'version\'

### sess 读取数据自动生成 df
data = spark.read.text(\'/usr/yanshw/test.txt\')      #read 可读类型 [ \'csv\', \'format\', \'jdbc\', \'json\', \'load\', \'option\', \'options\', \'orc\', \'parquet\', \'schema\', \'table\', \'text\']
print(type(data))       # <class \'pyspark.sql.dataframe.DataFrame\'>

 

Hive 数据源

## 也需要把 hive/conf/hive-site.xml 复制到 spark/conf 下
spark = SparkSession.builder.appName(\'test\').master(\'yarn\').enableHiveSupport().getOrCreate()

hive_data = spark.sql(\'select * from hive1101.person limit 2\')
print(hive_data)        # DataFrame[name: string, idcard: string]

 

SparkSession vs SparkContext

SparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点,字面理解是创建会话,或者连接 spark

在 spark1.x 中,SparkContext 是 spark 的主要切入点,由于 RDD 作为主要的 API,我们通过 SparkContext 来创建和操作 RDD,

SparkContext 的问题在于:

1. 不同的应用中,需要使用不同的 context,在 Streaming 中需要使用 StreamingContext,在 sql 中需要使用 sqlContext,在 hive 中需要使用 hiveContext,比较麻烦

2. 随着 DataSet 和 DataFrame API 逐渐成为标准 API,需要为他们创建接入点,即 SparkSession

 

SparkSession 实际上封装了 SparkContext,另外也封装了 SparkConf、sqlContext,随着版本增加,可能更多,

所以我们尽量使用 SparkSession ,如果发现有些 API 不在 SparkSession 中,也可以通过 SparkSession 拿到 SparkContext 和其他 Context 等

 

在 shell 操作中,原生创建了 SparkSession,故无需再创建,创建了也不会起作用

在 shell 中,SparkContext 叫 sc,SparkSession 叫 spark

 

通过 spark 拿到 sc

>>> dir(spark)
[\'Builder\', \'__class__\', \'__delattr__\', \'__dict__\', \'__doc__\', \'__enter__\', \'__exit__\', \'__format__\', \'__getattribute__\', \'__hash__\', \'__init__\', \'__module__\', \'__new__\', \'__reduce__\', \'__r
educe_ex__\', \'__repr__\', \'__setattr__\', \'__sizeof__\', \'__str__\', \'__subclasshook__\', \'__weakref__\', \'_convert_from_pandas\', \'_createFromLocal\', \'_createFromRDD\', \'_create_from_pandas_with_arrow\', 
\'_create_shell_session\', \'_get_numpy_record_dtype\', \'_inferSchema\', \'_inferSchemaFromList\', \'_instantiatedSession\', \'_jsc\', \'_jsparkSession\', \'_jvm\', \'_jwrapped\', \'_repr_html_\', \'_sc\', \'_wrapped\', 
\'builder\', \'catalog\', \'conf\', \'createDataFrame\', \'newSession\', \'range\', \'read\', \'readStream\', \'sparkContext\', \'sql\', \'stop\', \'streams\', \'table\', \'udf\', \'version\']

spark.sparkContext  # 即 sc

 

dataframe 是 spark2.x 中新增的数据格式,由 SparkSession 直接读取,不管文件是什么类型,txt也好,csv也罢,输出格式都是 dataframe

而 SparkContext 不管读什么文件,输出格式都是 RDD

 

发表于 2019-12-04 14:29  努力的孔子  阅读(18662)  评论(0编辑  收藏  举报
 

分类:

技术点:

相关文章:

  • 2022-01-11
  • 2021-12-02
  • 2022-02-08
  • 2021-08-28
  • 2021-12-30
  • 2021-12-03
  • 2022-02-07
  • 2021-12-01
猜你喜欢
  • 2021-12-03
  • 2021-12-03
  • 2021-11-01
  • 2021-12-09
  • 2022-02-09
  • 2021-11-27
  • 2022-02-08
相关资源
相似解决方案