【问题标题】:Create new column in Spark DataFrame with diff of previous values from another column在 Spark DataFrame 中创建新列,其中包含与另一列先前值的差异
【发布时间】:2016-03-17 21:21:33
【问题描述】:

我有一个数据框,其中有一列带有纪元秒数。
除此之外,我想添加一列,其中包含当前时间值和上一个时间值之间的差异 - 换句话说,自基于时间戳列的数据帧中最后一行以来的时间差异。

如何根据之前的值添加这样的列?

我正在使用 Scala API。

【问题讨论】:

  • 您是否对数据进行分组/分区?
  • 到目前为止 - 只是列出了日志中的所有行,并希望根据时间绘制一些测量值
  • 如果您不对数据框进行分组,您将无法使用(我的意思是您可以,但您真的不想这样做)。改用滑动:stackoverflow.com/a/32679114/1560062。如果您决定分组,您可以使用如下所示的窗口函数:stackoverflow.com/q/34535833/1560062

标签: scala apache-spark dataframe apache-spark-sql


【解决方案1】:

你可以使用spark的滞后功能来实现这个

val df = sc.parallelize(Seq(
  (1540000005),
  (1540000004),
  (1540000003),
  (1540000002))).toDF("epoch")

// a lag function needs to have a window
val w = org.apache.spark.sql.expressions.Window.orderBy("epoch")  

import org.apache.spark.sql.functions.lag
// create a column epoch_lag_1 which is the epoch column with an offset of 1 and default value 0
val dfWithLag = df.withColumn("epoch_lag_1", lag("epoch", 1, 0).over(w))

// calculate the diff between epoch and epoch_lag_1
val dfWithDiff = dfWithLag.withColumn("diff", dfWithLag("epoch") - dfWithLag("epoch_lag_1"))

这应该会导致

dfWithDiff.show 


+----------+-----------+----------+                                                                                     
|     epoch|epoch_lag_1|      diff|                                                                                     
+----------+-----------+----------+                                                                                     
|1540000002|          0|1540000002|                                                                                     
|1540000003| 1540000002|         1|                                                                                     
|1540000004| 1540000003|         1|                                                                                     
|1540000005| 1540000004|         1|                                                                                     
+----------+-----------+----------+ 

【讨论】:

    【解决方案2】:

    这会做你想做的事,尽管正如指出的那样它可能会有点慢。

    df.printSchema
    root
     |-- ts: long (nullable = false)
    
    df.join(
      df.toDF("ts2"),
      $"ts2" < $"ts",
      "left_outer"
    ).groupBy($"ts").agg(max($"ts2") as "prev").select($"ts", $"ts" - $"prev" as "diff").show
    

    我们甚至可以使用我拉皮条的DataFrame-ified zipWithIndex 让它变得更好。假设我们使用它来添加 id 列,您可以这样做:

    df.join(
      df.toDF("prev_id", "prev_ts"), 
      $"id" === $"prev_id" + 1, 
      "left_outer"
    ).select($"ts", $"ts" - $"prev_ts" as "diff").show
    

    【讨论】:

    • 有点严重的轻描淡写 :) 这需要一个完整的笛卡尔积。
    • 我没有看到任何关于性能的问题。 :) 我的意思是,我可以做一个 zipWithIndex -- stackoverflow.com/questions/30304810/… -- 并使用 $"id" === $"id" - 1 对索引列进行连接 -- 这样会更好吗?
    • 实际上,它好多更好:) O(N)O(N^2) 更好。准确地说。但是,如果您放弃使用 RDD,则没有充分的理由回到 DF :(
    • 除非我尽量远离 RDD。我很难让 DataFrames 做我想做的事,我很高兴让优化器做它的事。并非所有人都想了解 Spark 的每一寸
    • 您可以尝试使用(并绑定)uinque 标识符,但这很棘手。
    【解决方案3】:

    我不知道 Scala。但是如何使用lag 生成滞后列,然后从另一列中减去一列?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-10-27
      • 1970-01-01
      • 2023-02-01
      • 2020-09-10
      • 2017-04-09
      • 2020-02-20
      • 2021-09-21
      • 2022-01-24
      相关资源
      最近更新 更多