【问题标题】:Spark SQL - How to select on dates stored as UTC millis from the epoch?Spark SQL - 如何从纪元中选择存储为 UTC 毫秒的日期?
【发布时间】:2014-12-25 16:44:38
【问题描述】:

我一直在搜索,但没有找到一个解决方案,即如何使用 Spark SQL 从纪元开始查询存储为 UTC 毫秒的日期。我从 NoSQL 数据源(来自 MongoDB 的 JSON)中提取的模式的目标日期为:

|-- dateCreated: struct (nullable = true)

||-- $date: long (nullable = true)

完整架构如下:

scala> accEvt.printSchema
root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- appId: integer (nullable = true)
 |-- cId: long (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- expires: struct (nullable = true)
 |    |    |-- $date: long (nullable = true)
 |    |-- metadata: struct (nullable = true)
 |    |    |-- another key: string (nullable = true)
 |    |    |-- class: string (nullable = true)
 |    |    |-- field: string (nullable = true)
 |    |    |-- flavors: string (nullable = true)
 |    |    |-- foo: string (nullable = true)
 |    |    |-- location1: string (nullable = true)
 |    |    |-- location2: string (nullable = true)
 |    |    |-- test: string (nullable = true)
 |    |    |-- testKey: string (nullable = true)
 |    |    |-- testKey2: string (nullable = true)
 |-- dateCreated: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- originationDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- processedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- receivedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)

我的目标是按照以下方式编写查询:

SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1]

到目前为止,我的过程是:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@29200d25

scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json")

...
14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s
accEvt: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:103

scala> accEvt.registerAsTable("accomplishmentEvent")

(此时下面的基线查询执行成功)

scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println)
...
[74475]

现在,我无法理解的巫术是如何形成我的选择语句来推理日期。例如,以下执行无错误,但返回零而不是所有记录的计数(74475)。

scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println)
...
[0]

我也尝试过一些丑陋的,比如:

scala> val now = new java.util.Date()
now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014

scala> val today = now.getTime
today: Long = 1414613115743

scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000)
thirtydaysago: Long = 1416316083039


scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println)

按照建议,我选择了一个命名字段以确保其有效。所以:

scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println)

返回:

[[1376318850033]]
[[1376319429590]]
[[1376320804289]]
[[1376320832835]]
[[1376320832960]]
[[1376320835554]]
[[1376320914480]]
[[1376321041899]]
[[1376321109341]]
[[1376321121469]]

然后扩展以尝试让我尝试过的某种日期工作:

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println)

导致错误:

java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true)))
...

按照同样的建议,在我们的字段名称前加上 $ 会导致另一种错误:

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println)
java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found

select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5

显然我不知道如何选择以这种方式存储的日期 - 谁能帮我填补这个空白?

我对 Scala 和 Spark 都比较陌生,如果这是一个基本问题,请原谅我,但我在论坛和 Spark 文档中的搜索结果为空。

谢谢。

【问题讨论】:

  • 您能说明一下您在哪个方面遇到问题吗?它是否在您的查询中构造了正确的过滤器表达式(即:WHERE 子句)?另外,想必您已经知道如何将数据放入RDDRDD 的类型是什么?
  • 我在帖子中添加了更多详细信息,更准确地概述了我所做的事情以及逃避我的事情。谢谢。
  • 要尝试两件事:(1)将 accEvt.printSchema() 的完整输出添加到您的问题中(或者顶部的 sn-p 实际上是整个问题?)和(2)尝试选择一个特定字段(或多个字段)而不是 * 来检查您的字段命名是否有效。看起来您的 JSON 不平坦,所以我想知道您的 sn-p 中显示的字段是否需要作为 dataCreated.$data 处理——我观察到 Spark SQL 文档中的 JSON 示例 is 平坦。
  • 我已经验证,在 Spark 1.1.0 中,点符号可用于访问嵌套字段。但是,您的下一个问题将是您的字段名称中包含“$”,并且dateCreated.$date 不是有效的 Spark SQL 标识符。它似乎与this bug 相关,它确实应该写成更笼统的东西。通常的 SQL 方括号引用也不起作用。 Spark SQL 解析器非常原始,可能很快就会被重写。也许您可以处理 RDD 以摆脱美元符号,或预处理文件。
  • 刚刚注意到您最近的编辑。注意你的receivedDate 是如何用两个方括号打印出来的?这就是嵌套在起作用!

标签: sql date apache-spark apache-spark-sql


【解决方案1】:

您的 JSON 不是扁平的,因此顶层以下的字段需要使用限定名称来处理,例如 dateCreated.$date。您的特定日期字段都是 long 类型,因此您需要对它们进行数值比较,看起来您在做这些事情上是正确的。

另一个问题是您的字段名称包含“$”字符,Spark SQL 不允许您查询它们。一种解决方案是,不是直接将 JSON 读取为 SchemaRDD(如您所做的那样),而是首先将其读取为 RDD[String],使用 map 方法执行您选择的 Scala 字符串操作,然后使用SQLContextjsonRDD 方法创建SchemaRDD

val lines = sc.textFile(...)
// you may want something less naive than global replacement of all "$" chars
val linesFixed = lines.map(s => s.replaceAllLiterally("$", ""))
val accEvt = sqlContext.jsonRDD(linesFixed)

我已经使用 Spark 1.1.0 对此进行了测试。

作为参考,this bug report 和其他可能已经注意到 Spark SQL 中缺乏引用功能,并且似乎最近修复了 checked in,但需要一些时间才能发布

【讨论】:

  • 因此,出于某种原因,原始 JSON 中的 mongo 导出将我的日期对象转换为具有嵌套的 $date 字段名称。所以我测试了你的想法并转换了'$date'->'date',然后我可以使用例如"select processedDate.date"查询长值。我仍然必须将日期转换为毫秒并将它们注入到查询中以执行范围等等,但我想这是您以这种方式存储日期所付出的代价。我现在必须研究 mongo export 正在做什么,并将其纳入我们前进的战略中。感谢您的所有帮助!
  • 要记住的是,您可以在传递数据期间进行其他翻译。将纪元毫秒转换为其他一些 Scala 日期格式(此站点上的问题/答案中有很多信息)非常容易。问题是你需要弄清楚 Spark SQL 可以实际处理什么样的日期比较(我没有研究过)。 OTOH,如果事实证明 Spark SQL 无法 [还] 处理您想要的内容,那么使用直接 Spark 过滤 RDD 也非常简单。
猜你喜欢
  • 2012-01-02
  • 1970-01-01
  • 2020-12-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-09
  • 1970-01-01
相关资源
最近更新 更多