【问题标题】:spark (Scala) dataframe filtering (FIR)火花(Scala)数据帧过滤(FIR)
【发布时间】:2016-05-01 11:24:06
【问题描述】:

假设我有一个数据框(存储在 scala val 中为df),其中包含来自 csv 的数据:

time,temperature
0,65
1,67
2,62
3,59

我可以从文件中将其作为 scala 语言的 spark 数据帧读取。

我想添加一个过滤列(过滤器是指信号处理移动平均过滤),(比如说我想做(T[n]+T[n-1])/2.0):

time,temperature,temperatureAvg
0,65,(65+0)/2.0
1,67,(67+65)/2.0
2,62,(62+67)/2.0
3,59,(59+62)/2.0

(实际上,说第一行,我想要32.5而不是(65+0)/2.0。我写它是为了澄清预期的2-time-step过滤操作输出)

那么如何实现呢?我不熟悉沿列迭代组合行的 spark 数据框操作...

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    Spark 3.1+

    替换

    $"time".cast("timestamp")
    

    import org.apache.spark.sql.functions.timestamp_seconds
    
    timestamp_seconds($"time")
    

    Spark 2.0+

    在 Spark 2.0 及更高版本中,可以使用 window 函数作为 groupBy 的输入。它允许您指定 windowDurationslideDurationstartTime(偏移量)。它仅适用于 TimestampType 列,但找到解决方法并不难。在您的情况下,需要一些额外的步骤来纠正边界,但一般解决方案可以如下所示:

    import org.apache.spark.sql.functions.{window, avg}
    
    df
        .withColumn("ts", $"time".cast("timestamp"))
        .groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second"))
        .avg("temperature")
    

    火花

    如果有一种自然的数据分区方式,您可以使用如下窗口函数:

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.mean
    
    val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0)
    
    val df = sc.parallelize(Seq(
        (1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59)
    )).toDF("id", "time", "temperature")
    
    df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show
    
    // +---+----+-----------+--------------+                             
    // | id|time|temperature|temperatureAvg|
    // +---+----+-----------+--------------+
    // |  1|   0|         65|          65.0|
    // |  1|   1|         67|          66.0|
    // |  1|   2|         62|          64.5|
    // |  1|   3|         59|          60.5|
    // +---+----+-----------+--------------+
    

    您可以使用lead / lag 函数创建具有任意权重的窗口:

     lit(0.6) * $"temperature" + 
     lit(0.3) * lag($"temperature", 1) +
     lit(0.2) * lag($"temperature", 2)
    

    没有partitionBy 子句仍然可以,但效率极低。如果是这种情况,您将无法使用DataFrames。相反,您可以在 RDD 上使用sliding(参见例如Operate on neighbor elements in RDD in Spark)。还有spark-timeseries 包,您可能会觉得有用。

    【讨论】:

    • 感谢零!它工作得很好。一个相关的问题,如果我想进行过滤而不是 0.5T[n] + 0.5T[n-1] 但想要像 0.6T[n] + 0.3T[n-1] + 0.1T[n-2] 其中 T[n] 是第 n 行的温度怎么办?
    • laglead代替mean
    猜你喜欢
    • 2020-08-09
    • 2018-10-27
    • 2020-02-13
    • 2019-01-14
    • 2023-03-29
    • 2016-06-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多