【问题标题】:SparkSQL - Lag function?SparkSQL - 滞后功能?
【发布时间】:2015-12-10 16:39:23
【问题描述】:

我在这个DataBricks post 中看到,SparkSql 中支持窗口函数,特别是我正在尝试使用 lag() 窗口函数。

我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行显示交易金额,以及当前行的金额与前一行的金额之差金额。

在 DataBricks 帖子之后,我提出了这个查询,但它向我抛出了一个异常,我无法完全理解为什么..

这是在 PySpark 中。tx 是我的数据框,已在注册为临时表时创建。

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

和异常(截断)..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

我真的很感激任何见解,这个功能是相当新的,就现有示例或其他相关帖子而言,没有太多可继续的。

编辑

我也尝试在没有如下 SQL 语句的情况下执行此操作,但仍然出现错误。我已经将它与 Hive 和 SQLContext 一起使用,并收到相同的错误。

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()

Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)

【问题讨论】:

    标签: sql apache-spark pyspark apache-spark-sql window-functions


    【解决方案1】:
    1. 帧规范应该以关键字ROWS而不是ROW开头
    2. 帧规范需要下限值

      ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
      

      UNBOUNDED关键字

      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
      
    3. LAG 函数根本不接受帧,因此带有延迟的正确 SQL 查询可能如下所示

      SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
           PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
      ) as prev_amt from tx
      

    编辑

    关于 SQL DSL 的使用:

    1. 正如您在错误消息中看到的那样

      请注意,目前使用窗口函数需要 HiveContex

      一定要使用HiveContext而不是SQLContext初始化sqlContext

    2. windowSpec.rowsBetween(-1, 0) 什么都不做,但lag 函数再次不支持帧规范。

    【讨论】:

    • 感谢您的回复!我已经用额外的非 sql 语法(新错误)更新了帖子,并尝试根据您的建议更新 SQL 语法(同样的错误)。也许您可以查看我的第二次尝试和 SQL 语法,并具体找出我做错了什么? test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag() OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) from tx")
    猜你喜欢
    • 1970-01-01
    • 2023-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-19
    • 1970-01-01
    • 2021-05-23
    • 1970-01-01
    相关资源
    最近更新 更多