【问题标题】:PySpark using window to create field using previous created field valuePySpark 使用窗口使用先前创建的字段值创建字段
【发布时间】:2020-05-14 13:30:33
【问题描述】:

我正在尝试使用 Window 在我的 df 中创建一个名为 indexCP 的新列我想从 indexCP * (current_df['return']+1) 中获取以前的值,如果没有以前的 indexCP do 100 * (current_df[ '返回']+1)。

column_list = ["id","secname"]
windowval = (Window.partitionBy(column_list).orderBy(col('calendarday').cast("timestamp").cast("long")).rangeBetween(Window.unboundedPreceding, 0))
spark_df = spark_df.withColumn('indexCP', when(spark_df["PreviousYearUnique"] == spark_df["yearUnique"], 100 * (current_df['return']+1)).otherwise(last('indexCP').over(windowval) * (current_df['return']+1)))

当我运行上面的代码时,我得到一个错误“AnalysisException:”cannot resolve 'indexCP' given input columns:”我相信这是说你不能取一个尚未创建的值,但我不确定如何解决它。

Starting Data Frame
## +---+-----------+----------+------------------+       
## | id|calendarday|   secName|            return|
## +---+-----------+----------+------------------+
## |  1|2015-01-01 |         1|            0.0076|
## |  1|2015-01-02 |         1|            0.0026|
## |  1|2015-01-01 |         2|            0.0016|
## |  1|2015-01-02 |         2|            0.0006|
## |  2|2015-01-01 |         3|            0.0012|
## |  2|2015-01-02 |         3|            0.0014|
## +---+----------+-----------+------------------+

New Data Frame IndexCP added
## +---+-----------+--------+---------+------------+       
## | id|calendarday| secName|   return|     IndexCP|
## +---+-----------+--------+---------+------------+
## |  1|2015-01-01 |       1|   0.0076|      100.76|(1st 100*(return+1))
## |  1|2015-01-02 |       1|   0.0026|  101.021976|(2nd 100.76*(return+1))
## |  2|2015-01-01 |       2|   0.0016|      100.16|(1st 100*(return+1))
## |  2|2015-01-02 |       2|   0.0006|  100.220096|(2nd 100.16*(return+1))
## |  3|2015-01-01 |       3|   0.0012|     100.12 |(1st 100*(return+1))
## |  3|2015-01-02 |       3|   0.0014|  100.260168|(2nd 100.12*(return+1))
## +---+----------+---------+---------+------------+

【问题讨论】:

  • 下面的更新答案对您有用吗?

标签: python apache-spark pyspark aws-glue


【解决方案1】:

编辑:这应该是最终答案,我已将其扩展为 secName 列的另一行。

您正在寻找的是使用 IndexCP * (current_return + 1) 公式的滚动乘积函数。 首先,您需要将所有现有回报汇总到 ArrayType 中,然后进行汇总。这可以通过一些 Spark SQL aggregate 函数来完成,例如:

column_list = ["id","secname"]
windowval = (
    Window.partitionBy(column_list)
      .orderBy(f.col('calendarday').cast("timestamp"))
      .rangeBetween(Window.unboundedPreceding, 0)
)


df1.show()
+---+-----------+-------+------+
| id|calendarday|secName|return|
+---+-----------+-------+------+
|  1| 2015-01-01|      1|0.0076|
|  1| 2015-01-02|      1|0.0026|
|  1| 2015-01-03|      1|0.0014|
|  2| 2015-01-01|      2|0.0016|
|  2| 2015-01-02|      2|6.0E-4|
|  2| 2015-01-03|      2|   0.0|
|  3| 2015-01-01|      3|0.0012|
|  3| 2015-01-02|      3|0.0014|
+---+-----------+-------+------+

# f.collect_list(...) gets all your returns - this must be windowed
# cast(1 as double) is your base of 1 to begin with
# (acc, x) -> acc * (1 + x) is your formula translated to Spark SQL
# where acc is the accumulated value and x is the incoming value
df1.withColumn(
    "rolling_returns", 
    f.collect_list("return").over(windowval)
).withColumn("IndexCP", 
    100 * f.expr("""
    aggregate(
       rolling_returns,
       cast(1 as double),
       (acc, x) -> acc * (1+x))
    """)
).orderBy("id", "calendarday").show(truncate=False)

+---+-----------+-------+------+------------------------+------------------+
|id |calendarday|secName|return|rolling_returns         |IndexCP           |
+---+-----------+-------+------+------------------------+------------------+
|1  |2015-01-01 |1      |0.0076|[0.0076]                |100.76            |
|1  |2015-01-02 |1      |0.0026|[0.0076, 0.0026]        |101.021976        |
|1  |2015-01-03 |1      |0.0014|[0.0076, 0.0026, 0.0014]|101.16340676640002|
|2  |2015-01-01 |2      |0.0016|[0.0016]                |100.16000000000001|
|2  |2015-01-02 |2      |6.0E-4|[0.0016, 6.0E-4]        |100.220096        |
|2  |2015-01-03 |2      |0.0   |[0.0016, 6.0E-4, 0.0]   |100.220096        |
|3  |2015-01-01 |3      |0.0012|[0.0012]                |100.12            |
|3  |2015-01-02 |3      |0.0014|[0.0012, 0.0014]        |100.26016800000002|
+---+-----------+-------+------+------------------------+------------------+

说明:起始值必须为 1,并且 100 的乘数必须在表达式的外部,否则您确实开始漂移超过预期收益 100 倍。

我已验证现在的值符合您的公式,例如 secName == 1 and id == 1

100 * ((1.0026 * (0.0076 + 1)) * (0.0014 + 1)) = 101.1634067664

根据公式(acc, x) -> acc * (1+x),这确实是正确的。希望这会有所帮助!

【讨论】:

  • 这似乎与我正在寻找的等式不同,因为如果我用这个等式在电子表格上进行数学运算,则数字不相等。如果没有一个从 100 开始的等式,我需要采用前一个 IndexCP 列,然后执行 * (return + 1)。它试图计算包括最后一个值在内的每日复利的股票回报值。
  • @CharlesP 抱歉,我误解了这个问题。我已经根据你的公式用正确的计算更新了答案。
  • 我想我可能解释错了,所以不用担心你的帮助很大,因为我是 pyspark 的初学者,在谷歌上找不到任何东西。这是否总是做 100 * (SUM(rolling_returns)+1) 因为它需要使用最后一个 IndexCP 值作为 100。 (last_IndexCP * (currentReturn + 1)) 虽然在我列出的示例中,我们得到了相同的值。当它们超过 2 个相同的 secNames 时,数字会分开。我已经更新了 New Data Frame IndexCP added show 以尝试更好地解释它。我会用这个方程做一个测试,看看它们是否分开
  • @CharlesP 你能进一步澄清一下吗?我已经更新了与您完全相同的 idsecName 列,并且公式现在返回所有正确的 IndexCP 值。事实上,公式(acc, x) -> acc * (1 + x) 转换为((last_IndexCP) * (return +1)),其中last_IndexCP 开始于cast(100 as double)
  • 我能看到的唯一偏差是该值可能等于100.16000000000001 而不是100.16,但这可以通过四舍五入到n 小数点轻松解决——这是交易的常见方面使用浮点小数。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-01-19
  • 2014-01-22
  • 2014-02-18
  • 1970-01-01
  • 2019-02-27
相关资源
最近更新 更多