【问题标题】:Dataset api of Spark giving different result as compare to Dataframe与 Dataframe 相比,Spark 的 Dataset api 给出不同的结果
【发布时间】:2017-11-04 06:02:36
【问题描述】:

我使用的是 Spark 2.1 并且有一个 orc 格式的 hive 表,以下是架构。

col_name    data_type
tuid        string
puid        string
ts          string
dt          string
source      string
peer        string
# Partition Information 
# col_name  data_type
dt          string
source      string
peer        string

# Detailed Table Information    
Database:           test
Owner:              test
Create Time:        Tue Nov 22 15:25:53 GMT 2016
Last Access Time:   Thu Jan 01 00:00:00 GMT 1970
Location:           hdfs://apps/hive/warehouse/nis.db/dmp_puid_tuid
Table Type:         MANAGED
Table Parameters:   
  transient_lastDdlTime 1479828353
  SORTBUCKETCOLSPREFIX  TRUE

# Storage Information   
SerDe Library:  org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat:    org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat:   org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed: No
Storage Desc Parameters:    
  serialization.format  1

当我使用分区列在此表上应用过滤器时,它工作正常并且只读取特定分区。

val puid = spark.read.table("nis.dmp_puid_tuid")
    .as(Encoders.bean(classOf[DmpPuidTuid]))
    .filter( """peer = "AggregateKnowledge" and dt = "20170403"""")

这是我对此查询的实际计划

== Physical Plan ==
HiveTableScan [tuid#1025, puid#1026, ts#1027, dt#1022, source#1023, peer#1024], MetastoreRelation nis, dmp_puid_tuid, [isnotnull(peer#1024), isnotnull(dt#1022), 
(peer#1024 = AggregateKnowledge), (dt#1022 = 20170403)]

但是当我使用下面的代码时,它会将整个数据读入 spark

val puid = spark.read.table("nis.dmp_puid_tuid")
    .as(Encoders.bean(classOf[DmpPuidTuid]))
    .filter( tp => tp.getPeer().equals("AggregateKnowledge") && Integer.valueOf(tp.getDt()) >= 20170403)

上述数据框的物理计划

== Physical Plan ==
*Filter <function1>.apply
+- HiveTableScan [tuid#1058, puid#1059, ts#1060, dt#1055, source#1056, peer#1057], MetastoreRelation nis, dmp_puid_tuid

注意:- DmpPuidTuid是java bean类

【问题讨论】:

    标签: java scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    当您将 Scala 函数传递给 filter 时,您会阻止 Spark 优化器查看数据集的哪些列实际使用(因为优化器不会尝试查看函数的编译代码内部)。如果您传递一个列表达式,例如col("peer") === "AggregateKnowledge" &amp;&amp; col("dt").cast(IntegerType) &gt;= 20170403,那么优化器将能够查看实际需要哪些列并相应地调整计划。

    【讨论】:

    • 谢谢@joe 以后有没有其他方法可以实现数据集的typesefe功能或任何支持。
    • 如果你的意思是编译时类型检查,我只知道frameless 项目。不过,我不是这方面的专家。
    猜你喜欢
    • 2021-05-03
    • 2017-06-15
    • 1970-01-01
    • 1970-01-01
    • 2021-05-25
    • 2020-08-30
    • 2020-02-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多