Spark 3.1+
替换
$"time".cast("timestamp")
与
import org.apache.spark.sql.functions.timestamp_seconds
timestamp_seconds($"time")
Spark 2.0+
在 Spark 2.0 及更高版本中,可以使用 window 函数作为 groupBy 的输入。它允许您指定 windowDuration、slideDuration 和startTime(偏移量)。它仅适用于 TimestampType 列,但找到解决方法并不难。在您的情况下,需要一些额外的步骤来纠正边界,但一般解决方案可以如下所示:
import org.apache.spark.sql.functions.{window, avg}
df
.withColumn("ts", $"time".cast("timestamp"))
.groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second"))
.avg("temperature")
火花
如果有一种自然的数据分区方式,您可以使用如下窗口函数:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.mean
val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0)
val df = sc.parallelize(Seq(
(1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59)
)).toDF("id", "time", "temperature")
df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show
// +---+----+-----------+--------------+
// | id|time|temperature|temperatureAvg|
// +---+----+-----------+--------------+
// | 1| 0| 65| 65.0|
// | 1| 1| 67| 66.0|
// | 1| 2| 62| 64.5|
// | 1| 3| 59| 60.5|
// +---+----+-----------+--------------+
您可以使用lead / lag 函数创建具有任意权重的窗口:
lit(0.6) * $"temperature" +
lit(0.3) * lag($"temperature", 1) +
lit(0.2) * lag($"temperature", 2)
没有partitionBy 子句仍然可以,但效率极低。如果是这种情况,您将无法使用DataFrames。相反,您可以在 RDD 上使用sliding(参见例如Operate on neighbor elements in RDD in Spark)。还有spark-timeseries 包,您可能会觉得有用。