【问题标题】:What does an asterisk preceding a PushedFilters entail in a Spark SQL Explain Plan在 Spark SQL 解释计划中,PushedFilters 前面的星号意味着什么
【发布时间】:2020-05-11 19:07:59
【问题描述】:

关于 Spark Physical Explain Plans 中显示的 Spark PushedFilters,它声明(参考。 https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html):

星号表示下推过滤器将仅在数据源级别处理。

这意味着什么,更重要的是,当您看到没有星号的 PushedFilters 数组条目时, 过滤器是否仍被下推到数据源级别并在其外部处理,但是 为什么首先将其称为推送过滤器?

非常混乱和谷歌搜索我找不到问题的真正答案。

谢谢!

一月

【问题讨论】:

    标签: sql apache-spark filter


    【解决方案1】:

    谓词的下推总是发生在数据源级别。它以一种方式发生,即数据源将选择性地扫描所预测的那些数据。 Spark 只是一个处理引擎,它将查询交给数据源以供最终执行。另一方面,数据源会按照自己的意愿执行查询。 Spark-sql 连接器知道数据源的行为(基于模式),因此它们可以使用下推谓词预测物理计划,但它们不能保证它会运行,因此使用星号。

    我对本地 parquet 文件进行了查询。物理计划已下推谓词且没有星号。它是 Spark 自行读取的本地 parquet 文件,因此物理计划是 100% 准确的。

        val df = spark.read.parquet("/Users/Documents/temp/temp1")
        df.filter($"income" >= 30).explain(true)
    
    
    == Physical Plan ==
    *(1) Project [client#0, type#1, address#2, type_2#3, income#4]
    +- *(1) Filter (isnotnull(income#4) && (income#4 >= 30))
       +- *(1) FileScan parquet [client#0,type#1,address#2,type_2#3,income#4] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/User/Documents/temp/temp1], PartitionFilters: [], PushedFilters: [IsNotNull(income), GreaterThanOrEqual(income,30)], ReadSchema: struct<client:string,type:string,address:string,type_2:string,income:int>
    
    

    这里使用 Spark-SQL 从 Oracle 读取一个表。 Oracle DB 使用谓词下推和索引访问,但 Spark 对此一无所知。

    == Physical Plan ==
    Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand file:/data/.., false, Parquet, Map(codec -> org.apache.hadoop.io.compress.snappyCodec, path -> /data/...), Overwrite, [COLUMN_01, COLUMN_02, COLUMN_03, COLUMN_04, COLUMN_05, COLUMN_06, COLUMN_07, COLUMN_08, COLUMN_09, COLUMN_10, COLUMN_11, COLUMN_12, COLUMN_13, COLUMN_14, COLUMN_15, COLUMN_16, COLUMN_17, COLUMN_18, ... 255 more fields]
    +- Project [COLUMN_01#1246, COLUMN_02#1247, COLUMN_03#1248, COLUMN_04#1249, COLUMN_05#1250, COLUMN_06#1251, COLUMN_07#1252, COLUMN_08#1253, COLUMN_09#1254, COLUMN_10#1255, COLUMN_11#1256, COLUMN_12#1257, COLUMN_13#1258, COLUMN_14#1259, COLUMN_15#1260, COLUMN_16#1261, COLUMN_17#1262, COLUMN_18#1263, ... 255 more fields]
       +- Scan JDBCRelation((select cu.*, ROWIDTONCHAR(t.rowid) as ROW_ID from table t  where (column1 in (786567473,786567520,786567670,786567570,...........)) and column2 in (10,11, ...) and t.result is null)t) [numPartitions=20] [COLUMN_87#1332,COLUMN_182#1427,COLUMN_128#1373,COLUMN_104#1349,COLUMN_189#1434,COLUMN_108#1353,COLUMN_116#1361,COLUMN_154#1399,COLUMN_125#1370,COLUMN_120#1365,COLUMN_267#1512,COLUMN_54#1299,COLUMN_100#1345,COLUMN_230#1475,COLUMN_68#1313,COLUMN_44#1289,COLUMN_53#1298,COLUMN_97#1342,COLUMN_03#1248,COLUMN_16#1261,COLUMN_43#1288,COLUMN_50#1295,COLUMN_174#1419,COLUMN_20#1265,... 254 more fields] PushedFilters: [], ReadSchema: struct<COLUMN_87:string,COLUMN_182:string,COLUMN_128:string,COLUMN_104:string,COLUMN_189:string,C...
    

    【讨论】:

    • 感谢萨利姆的回复!没有星号的下推过滤器,这是什么意思?我在这个线程gitter.im/datastax/… 中读到,不带星号的下推过滤器执行速度较慢,我想知道这是否是带星号和不带星号的区别。
    • 想象 2 个数据源 - 一个本地 parquet 文件与数据库上的删除表。 Spark 将读取文件本身,而 Spark 将依赖数据库读取数据。读取本地 parquet 文件的计划没有星号,而从远程数据库读取的计划没有星号。
    • 嗨 Salim,我认为第二个物理计划有一个空的下推过滤器数组,PushedFilters: [],因为它是一个没有 where 子句的插入。我注意到在我们的物理解释计划中 isnotnull() 过滤器没有 *,它在 SQL 查询 where 子句中没有任何对等点,但由 Spark 本身插入,而我们在 SQL 查询中的实际 where 子句得到一个 *。一个例子:PushedFilters: [IsNotNull(TYPE), *EqualTo(TYPE,BAR)] 我仍然不知道这两个过滤器是如何被执行/下推的(对不起!::)
    • @JB007 我的观点是,当数据源读取 Spark 的数据时,Spark 不知道执行 - 例如 Oracle DB。当 Spark 读取本地 parquet 文件时,它可以准确地判断数据将如何被读取。对于像 Cassandra 这样的特殊数据连接器,Spark 会更好地了解执行,但又不能保证执行,所以放了一个 *.
    猜你喜欢
    • 2020-01-08
    • 2015-04-12
    • 1970-01-01
    • 2010-12-12
    • 1970-01-01
    • 1970-01-01
    • 2019-04-08
    • 2017-03-02
    • 2011-03-03
    相关资源
    最近更新 更多