【问题标题】:SparkSQL Pushdown Filtering not Working in Spark Cassandra ConnectorSparkSQL 下推过滤在 Spark Cassandra 连接器中不起作用
【发布时间】:2016-06-23 04:22:39
【问题描述】:

我有一个表架构

appname text,
randomnum int,
addedtime timestamp,
shortuuid text,
assetname text,
brandname text,

PRIMARY KEY ((appname, randomnum), addedtime, shortuuid)

addedtime 是聚类键

现在当我在集群键添加时间上使用下推过滤器时,我没有看到它被应用

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain
== Physical Plan ==
Filter (cast(addedtime#2 as string) > 2016-12-20 11:00:00)

根据文档,它应该被应用 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushdown-filter-examples

它还在 spark cassandra 连接器 1.4 中工作,但不适用于最新的 cassandra 连接器 1.6.0-M1。请告知问题

【问题讨论】:

  • 这看起来像是连接器中的错误,您应该在项目中提交 Jira。
  • 当然还有一些观察。我尝试使用 spark 1.5 和 1.6,但它不工作。我尝试了旧的连接器,即 1.4 和 spark 1.6,但它不工作。所以使用 spark 1.4 的连接器,同样的连接器不适用于 spark 1.6
  • nvm 想通了,请看下面的答案
  • 我在下面看不到任何答案?
  • 我还没写完抱歉 :)

标签: apache-spark cassandra


【解决方案1】:

问题分析

问题似乎在于 Catalyst 处理比较的方式。

做的时候

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain

它将 addedTime 列转换为字符串,然后进行比较。 Catalyst 不会将此谓词呈现给 Spark Cassandra 连接器,因此无法推送它。

INFO  2016-03-08 17:10:49,011 org.apache.spark.sql.cassandra.CassandraSourceRelation: Input Predicates: []
Filter (cast(addedtime#2 as string) > 2015-08-03)

这也是错误的,因为它正在做一个字符串比较(这在词法上可以在这里工作,但并不是你真正想要做的)所以这看起来像 Catalyst 中的一个错误,因为我们可能应该将谓词呈现给源,即使有一个“演员”。有一种解决方法,其中涉及为 Catalyst 优化器提供它想要看到的内容。

解决方法

如果我们给出类型提示

df.filter("addedtime > cast('2015-08-03' as timestamp)").explain

然后Spark会在没有字符串Cast的情况下生成正确的比较

DEBUG 2016-03-08 17:11:09,792 org.apache.spark.sql.cassandra.CassandraSourceRelation: Basic Rules Applied:
C* Filters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
Spark Filters []

== Physical Plan ==
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@332464fe[appname#0,randomnum#1,addedtime#2,shortuuid#3] PushedFilters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]

【讨论】:

  • 能否告诉我您使用的 spark 和 cassandra 连接器版本
  • 完美的 Russs,它适用于最新的 spark 和 cassandra 连接器版本。你是冠军。在这些问题上,您之前也提供了很多帮助。我很感激
【解决方案2】:

你也可以使用 java.sql.Timestamp

val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val date = LocalDateTime.parse("2015-08-03", dateFormatter)
val timestamp= Timestamp.from(date.atZone(ZoneId.systemDefault()).toInstant)

df.filter($"addedtime" > timestamp).explain

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-09-11
    • 2019-05-14
    • 1970-01-01
    • 2020-12-14
    • 1970-01-01
    • 2016-07-14
    • 1970-01-01
    相关资源
    最近更新 更多