Avoiding Hive JARs
 
如果不能在应用程序中包含Hive依赖,那么可以忽略Spark的Hive组件,而创建SQLContext,如示例3- 10所示。这提供了大部分相同的功能,但是使用了功能较差的SQL解析器,并且缺少某些基于hive的用户定义函数(udfs)和用户定义的聚合函数(udfs)。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
与核心SparkContext和StreamingContext一样,Hive/SQLContext用来加载数据。JSON是一种非常流行的格式,部分原因是它可以很容易地以多种语言加载,而且至少是半人可读的。由于这些原因,我们在书中包含的一些示例数据是JSON格式的。JSON特别有趣,因为它缺少模式信息,Spark需要做一些工作来从我们的数据推断模式。JSON解析的代价也很高;在一些简单的情况下,解析输入JSON数据可能比实际操作要大。我们将在JSON中介绍完整的加载和保存JSON API,但在开始之前,让我们加载一个可以用来研究模式的示例(参见示例3-11)。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
请随意加载您自己的JSON数据,但是如果您没有任何方便的测试工具,请查看示例GitHub资源目录。既然已经加载了JSON数据,就可以开始研究Spark为数据推断的模式了。
 
Basics of Schemas
 
模式信息及其支持的优化是Spark SQLand core Spark之间的核心区别之一。检查模式对于数据类型特别有用,因为您没有使用RDDs或datasets时使用的模板类型。模式通常由Spark SQL自动处理,可以在加载数据时进行推断,也可以根据父 DataFrames 和转换计算进行推断
 
DataFrames以人类可读模式或编程格式公开的。printSchema()将向我们展示一个DataFrame的模式,它在shell中最常用来确定您正在使用的是什么。这对于像JSON这样的数据格式特别有用,在这些数据格式中,模式可能不会通过只查看少量记录或读取头部就立即可见。对于编程使用,您可以通过简单地调用schema来获得模式,该模式在ML管道转换器中经常使用。因为您可能熟悉case类和JSON,所以让我们在示例3-12和3-13中研究一下等效的Spark SQLschema是如何表示的
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
现在,定义了case类之后,您就可以创建一个本地实例,将其转换为 Dataset ,并打印如示例3-14所示的模式,从而生成示例3- 15。对JSON数据也可以这样做,但是需要进行一些配置,就像在JSON中讨论的那样
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
除了人类可读的模式外,还可以通过编程方式使用模式信息。程序模式作为StructField返回,如示例3-16所示。
 
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
示例3-17以机器可读的格式显示了与示例3-15相同的模式。( 手工格式化嵌套结构(.schema())的模式信息示例
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
从这里,您可以深入了解这个模式信息的含义,并了解如何构造更复杂的模式。第一部分是StructType,它包含一个字段列表。需要注意的是,您可以嵌套structtype,就像case类可以包含额外的case类一样。StructType中的字段是用StructField定义的,它指定名称、类型(参见表3-1和表3-2获取类型列表)和一个布尔值指示字段是否为空/缺失。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
提示: 如示例3-17所示,可以嵌套StructFields和所有复杂的Spark SQL类型。
既然您已经了解了如何理解数据模式(如果需要的话),那么就可以开始研究DataFrame接口了。
提示::SparkSQL模式被急切地评估,而不像底层的数据。如果您在shell中发现自己不确定转换将做什么,那么执行它并打印模式。3日- 15日看到的例子。
 
DataFrame API
 
Spark SQL的DataFrame API允许我们在不需要注册临时表或生成SQL表达式的情况下使用数据类型。DataFrame API同时具有转换和操作。 DataFrames 上的转换在本质上是关系型的,Datadet API(后面将介绍)提供了一个功能风格更强的API。
 
Transformations
 
DataFrames上的转换在概念上类似于RDD转换,但更具有关系风格。 不能任意指定函数,不然优化器无法自查,要使用受限表达式语法,以便优化器可以获得更多信息。与RDDs一样,我们可以将transformations广泛地分解为简单的单个DataFrames、多个DataFrames、键/值和分组/窗口transformations
注:Spark SQL transformations只是部分延迟;模式被急切地评估
 
Simple DataFrame transformations and SQL expressions 简单的数据格式转换和SQL表达式)
 
简单的DataFrame转换大多数允许我们在一次处理一行(针对于窄依赖而不是shuffle)。我们仍然可以执行许多在RDDs上定义的相同操作,只能使用Spark SQL表达式而不是任意函数。为了说明这一点,我们将首先检查可以作用在DataFrames 上不同类型的过滤操作。
和filter 类似,DataFrame函数接受Spark SQL表达式而不是lambdas。这些表达式允许优化器理解条件表示的内容,并且使用filter,它通常可以用来跳过读取不必要的记录。
首先,让我们看一个SQL表达式,它使用现有的模式过滤 unhappy pandas 的数据。第一步是查找包含此信息的列。在我们的示例中,它是happy,而对于我们的DataFrame(称为df),我们通过apply函数(例如,df(“happy”))访问该列。filter表达式要求表达式返回一个布尔值,如果您想选择happy pandas,整个表达式可以检索列值。但是,由于我们想要找到unhappy pandas,我们可以使用!==操作符来检查happy不为真,如例3-18所示。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
注: 要查找列,我们可以在特定的DataFrame上提供列名,或者使用隐式的$操作符进行列查找。这在DataFrame匿名时特别有用。!二进制否定函数可以与$一起使用,以简化从示例3-18到  df.filter(!$("happy")).
这说明了如何从DataFrame访问特定的列。对于访问 DataFrames 内的其他结构,如嵌套结构、( keyed maps, and array elements) 键控映射和数组元素,使用类似的apply语法。因此,如果属性数组中的第一个元素表示粘性,而您只想要更粘性的数据,那么您可以通过写入 df("attributes")(0) >= 0.5. 来访问该元素。
我们的表达式不需要局限于单个列。也可以比较“filter”表达式中的多个列。如3-19中所示的复杂过滤器更难下推到存储层,因此您可能不会看到使用简单过滤器作用在RDDs上的加速
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
注:Spark SQL的列操作符是在column类上定义的,因此不会编译包含表达式 0 >= df.col("friends") 的过滤器,因为Scala将使用在 >= 0上。相反,你可以写 df.col("friend") <= 0 ,或者用0转换lit中的列文字( 列文字是具有固定值的不改变行数据(常数))。
Spark SQL DataFrame API有一组非常大的操作符。您可以对浮点数使用所有标准的数学运算符,以及标准的逻辑和位操作(前缀带有位来区分逻辑)。列使用===和!==表示相等,以避免与Scala内部冲突。对于字符串列,startsWith/endsWith、substr、like和isNull都是可用的。完整的操作集在org.apache.spark.sq .Column 中列出和覆盖,在表3-3和表3-4。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
注:并不是所有Spark SQL表达式都可以用于每个API调用。例如,Spark SQL join不支持复杂操作,filter要求表达式结果为布尔值或类似值。
除了在列上直接指定的操作符外,org.apache.spark.sql.functions 中还有一组更大的列上的函数。我们在表3-5、3-6和3-7中介绍了其中一些函数。为了便于说明,本示例显示了特定行的每个列的值,但是请记住,这些函数是在列上调用的,而不是在值上调用的。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
除了简单地过滤数据之外,您还可以使用新列或旧列中更新的值生成一个DataFrame。Spark使用与筛选器相同的表达式语法,除了必须包含条件(比如测试是否相等)之外,结果还可以在新的DataFrame中用作值。例如3-20使用Spark SQL explode 函数将PandaPlaces的输入DataFrame转换为PandaInfo的数据名称,并计算每个panda的 “squishness”’与‘ attributes ’比率。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
提示: 在构造操作序列时,生成的列名可能会很快变得难以处理,因此使用as或alias操作符来指定结果列名非常有用。
 
虽然所有这些操作都非常强大,但有时您希望表达的逻辑使用if/else语义更容易编码。例3-21就是一个简单的例子,它将不同类型的panda编码为一个数字值( ML管道中的StringIndexer是为字符串索引编码而设计的。)。when和otherwise函数可以链接在一起以产生相同的效果。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
Specialized DataFrame transformations for missing and noisy data(针对数据丢失和脏数据的特殊转换
 
Spark sql还提供了处理丢失、空和无效数据的特殊工具。通过将isNan或isNull与过滤器一起使用,可以为希望保留的行创建条件。例如,如果您有许多不同的列,这些列可能具有不同的精度级别(其中一些可能为空),那么可以使用coalesce(c1, c2, ..)返回第一个非空列。类似地,对于数值型数据,nanvl返回第一个非nan值(例如,nanvl(0/0, sqrt(-2), 3)结果为3)。为了简化处理缺失数据的工作,DataFrame上的na函数为我们提供了在DataFrameNaFunctions( http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions)中处理缺失数据的一些常见例程;
 
Beyond row-by-row transformations
 
有时像使用 filter 那样逐行应用决策是不够的。Spark sql还允许我们通过调用dropDuplicates来选择特殊的行,但与RDDs上的类似操作(distinct)一样,这可能需要进行 shuffle ,因此通常比过滤器要慢得多。与RDDs不同,dropDuplicates可以根据列的一个子集选择性地删除行,比如ID字段,如示例3-22所示
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
这将很好地引导我们进入关于聚合和groupBy的下一节,因为它们中最昂贵的组件通常是shuffle。
 
Aggregates and groupBy
 
Spark sql有许多功能强大的 aggregates 由于它的优化器,可以很容易地将多个聚合组合到一个操作/查询中与panda 的 DataFrames类似,groupBy返回特殊对象,我们可以在这些对象上请求执行某些聚合在Spark 2.0之前的版本中,这是一个通用的GroupedData,但在2.0及以后的版本中,DataFrames groupBy与一个 Datasets 是相同的。
Datasets 上的聚合有额外的功能,当使用任意函数分组时返回GroupedDataset(在Spark的前2.0版本中)或KeyValueGroupedDataset,当使用 relational / Datasets DSl表达式分组时返回RelationalGroupedDataset。 “Grouped Operations on Datasets”中讨论了其他类型化功能,这里讨论了常见的 DataFrame and Dataset groupBy 功能。
min、max、avg和sum都是直接在GroupedData上实现的方便函数,还可以通过向agg提供表达式来指定更多的函数。示例3-23显示了如何按 zip code 计算 panda 的最大大小。一旦指定了要 aggregates de的计算,就可以以 DataFrame.返回结果。
如果您习惯使用RDDs,您可能会担心groupBy,但是由于Spark SQL优化器,它现在是对 DataFrames 的安全操作,它会自动处理我们的减少,避免巨大的 shuffles 和大量记录。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
虽然示例3-23在每个键的基础上计算最大值,但是这些 aggregates 也可以应用于整个DataFrame或一个DataFrame中的所有数字列。当试图为您正在使用的数据收集一些汇总统计信息时,这通常很有用。 事实上,有一个内置的describe转换可以完成这个任务,尽管它也可以被限制在特定的列上,在示例3-24中使用并返回示例3-25。
(描述和收集一些小样本数据的结果(注:汇总所有数值字段))
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
注:groupBy的行为在Spark版本之间发生了变化。在Spark 1.3之前,默认情况下丢弃分组列的值,而post 1.3则保留这些值。可以将配置参数设置 spark.sql.retainGroupColumns 为false来强制执行前面的功能。
对于计算多个不同的聚合或更复杂的聚合,应该使用GroupedData上的agg API,而不是直接调用count、mean或类似的方便函数。对于agg API,您可以提供聚合表达式列表、表示聚合的字符串或从列名到聚合函数名的映射。一旦我们用请求的聚合调用了agg,我们就会得到一个带有聚合结果的 DataFrame 
与常规函数一样,它们在 org.apache.spark.sql.functions Scaladoc中列出。表3-8列出了一些常见和有用的聚合。对于这些表中的示例结果,我们将考虑一个模式为name字段( (as a string )和age( as an integer )的DataFrame,它们都可以为空,具有值({“ikea”,null},{“tube”,6},{“real”,30})。示例3-26展示了如何计算我们正在运行的panda示例中的pandaSize列的最小值和平均值。
提示; 使用Spark SQL计算多个聚合比使用RDD API执行相同的任务要简单得多。
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
Chapter 3. DataFrames, Datasets, and Spark SQL 学习《 High Performance Spark 》持续更新中...
提示; 除了在groupBy上使用聚合之外,还可以运行相同的聚合在多维数据集上使用cube,和rollups上运行rollup。

相关文章: