SparkSQL概述

Spark为数据化出力引入了一个称为SparkSql的编程模块。它提供了DataFrame的编程抽象,并且可以充当分布式SQL查询引擎。

SparkSQL的前身是Shark。它是伯克利实验室Spark生态环境的组件之一,它基于hive做了一些改进,比如引入焕春管理,改进和优化执行器等,并使之能够运行在spark引擎上,虽然使得sql的查询速度得到大幅度提升,但是它对于hive有太多的依赖,制约了Spark的One Stack 入了 them all 的既定方针,以及spark各个组件的相互集成,所以提出了sparksql的项目。

sparkSQL抛弃原有的shark代码,但汲取了shark的一些优点,如内存列存储、hive兼容性等重新开发了sparksql代码。由于摆脱了hive的依赖性,sparkSql在数据兼容、性能优化、组件扩展方面得到了极大的提升。

Spark特点

1)引入了新的RDD类型SchemaRDD,可以向传统的数据库定义表一样定义SchemaRDD;

2)在应用程序中可以混合不同来源的数据,也就是说可以读取不同的数据源;

3)内嵌Catalyst查询优化框架,再把SQL解析成逻辑执行计划之后变成对应的RDD计算

 

入门

概述

sparkSQL将RDD封装成一个DataFrame对象,这个对象类似于关系型数据库中的表。

创建DataFrame对象

DataFrame就相当于数据库中的一张表。由于它是只读的,不能在运算过程中在往里添加数据。

创建方式:

RDD.toDF("列名")

举个例子:

关于SparkSQL那些事(一)

创建好的DataFrame可以通过show方法进行查看,默认情况下只显示20条数据。

通过printScema方法可以查看列的类型等属性。

关于SparkSQL那些事(一)

           创建多列DataFrame对象

                 关于SparkSQL那些事(一)

由外部文件构建RDD对象

 

          1)txt文件

             txt文件不能直接转换而成,必须先利用RDD转换为tuple,然后在由toDF()转换为DataFRame

             现在读取/home/software/test.txt文件

       >val rdd = sc.textFile("/home/software/test.txt").map( x => (x,1) ).reduceByKey( (x,y) => x+y )

       >val tabx=rdd.toDF("word","count")

       >tabx.show

      关于SparkSQL那些事(一)

     2)读取json文件

                    这里我们读取/home/software/people.json文件。

                   关于SparkSQL那些事(一)

                    在读取json文件时,我们需要导包,并且创建SQLContext对象读取json文件。

    3)读取parquet文件

                  特别提一句,Parquet数据格式是一种列式存储格式,可以被多种查询引擎支持,并且与平台无关。parquet文件是以二进制方式存储的,是不可以直接读取和修改。Parquet文件是自解析的,文件中包括文件数据和文件的元数据。

                 我们读取文件/home/software/users.parquet文件。

                 >import  org.apache.spark.sql.SQLContext

                 >val ssc=new SQLContext(sc)

                 >val tb5=ssc.read.parquet("/home/software/users.parquet")

                 > tb5.show

                 关于SparkSQL那些事(一)

              4)jdbc读取

                       具体的实现步骤:

                         1)将mysql的驱动jar上传到spark目录中的jar目录下;

                         2)重新启动spark服务

                         3)进入到spark客户端

                         4)执行代码:

                           >import  org.apache.spark.sql.SQLContext

                           >val ssc=new SQLContext(sc)

                           >val prop = new java.util.Properties

                           >prop.put("user","root")//设置用户名

                           >prop.put("password","root")//设置密码

                           >val tabx=ssc.read.jdbc("jdbc:mysql://hadoop01:3306/test","tabx",prop)//设置需要读取的数据库以及表

                           >tabx.show

                           执行结果:

                           关于SparkSQL那些事(一)

                        如果在运行过程中出现权限不足的 问题,则进入mysql:执行

                         grant all privileges on *.* to 'root'@'IP地址或主机名' identified by 'root' with grant option;

                         flush privileges;

  

 

相关文章:

  • 2021-09-18
  • 2019-07-14
  • 2021-04-04
  • 2021-12-04
  • 2021-06-29
  • 2022-01-02
  • 2021-07-24
  • 2021-12-03
猜你喜欢
  • 2021-09-28
  • 2021-10-02
  • 2021-09-17
  • 2021-11-27
  • 2021-09-11
  • 2020-05-27
  • 2018-09-20
相关资源
相似解决方案