【发布时间】:2018-07-11 09:00:53
【问题描述】:
(在 Spark 2.2 和 2.3 上测试)
我正在使用 Spark 将股票交易报价汇总到每日 OHLC(开盘-高-低-收)记录中。
输入的数据是这样的
val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")
data.createOrReplaceTempView("ticks")
data.show
scala>
显示为
+-------------------+-----+
| time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+
想要的输出是
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks
由于SQL的限制,这些方案比较繁琐。
今天,我发现 Spark SQL 可以在 GROUP BY 上下文中使用 FIRST_VALUE 和 LAST_VALUE which is not allowed in standard SQL。
Spark SQL 的这种无限性衍生出一个整洁的解决方案,如下所示:
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)
你可以试试
spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show
scala>
显示为
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+
但是,上面的结果是不正确的。 (请与上述预期结果进行比较。)
FIRST_VALUE 和 LAST_VALUE 需要确定性排序才能获得确定性结果。
我可以通过在分组前添加orderBy 来纠正它。
import org.apache.spark.sql.functions._
data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show
scala>
显示为
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
根据需要正确!!!
我的问题是,上面的代码“orderBy then groupBy”是否有效?这个订购有保证吗?我们可以在严肃的作品中使用这种非标准功能吗?
这个问题的重点是,在标准 SQL 中,我们只能先执行 GROUP BY 然后 ORDER BY 对聚合进行排序,而不是 ORDER BY 然后 GROUP BY。
GROUP BY 将忽略 ORDER BY 的顺序。
我也想知道 Spark SQL 是否可以在所需的顺序下执行这样的GROUP BY,标准 SQL 是否也可以为此发明这样的语法?
附言
我可以想到一些依赖于确定性排序的聚合函数。
WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID
WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID
如果没有WITH ORDER BY time,我们如何在标准 SQL 中对 COLLECTed_LIST 进行排序?
这些例子表明“GROUP BY under desired ordering”仍然有用。
【问题讨论】:
-
我想知道如果我们可以这样使用它,提供
first_value, last_value有什么意义,如果不是这样,预期的用例是什么? :( -
如果某个查询结果是有序的,并且它的分组是保序的,那么我们可以一次获得first_value,last_value,min,max,sum,count ...。不需要多个 partition-by 语句。我相信 spark 的 orderby+groupby 是保序的,因为我在上面的例子中得到了正确的结果。
-
至于用例,我在 P.S. 中展示了 2:
COLLECT_LIST和price - LAG(price)。我的问题的一部分。真正的预期用例是真正为股票价格计算 OHLC。
标签: apache-spark-sql