【发布时间】:2017-07-24 06:05:45
【问题描述】:
我有一个DataFrame 有两列,index 和 values。我想根据values 列获取delayValues。
这是我的代码:
val arr = Array(1,4,3,2,5,7,3,5,4,18)
val input=new ArrayBuffer[(Int,Int)]()
for(i<-0 until 10){
input.append((i,arr(i)))
}
val window=Window.rowsBetween(-2,0)
val df = sc.parallelize(input, 4).toDF("index","values")
df.withColumn("valueDealy",first(col("values")).over(window)).show()
这是结果:
这是我的预期结果,但我发现所有数据都收集到一个分区中,然后我使用partitionBy函数,这是我更改的代码:
val arr = Array(1,4,3,2,5,7,3,5,4,18)
val input=new ArrayBuffer[(Int,Int)]()
for(i<-0 until 10){
input.append((i,arr(i)))
}
val window=Window.orderBy(col("index")).partitionBy(col("index")).rowsBetween(-2,0)
val df = sc.parallelize(input, 4).toDF("index","values")
df.withColumn("valueDealy",first(col("values")).over(window)).show()
结果是:
+-----+------+----------+
|index|values|valueDealy|
+-----+------+----------+
| 0| 1| 1|
| 3| 2| 2|
| 7| 5| 5|
| 9| 18| 18|
| 4| 5| 5|
| 6| 3| 3|
| 5| 7| 7|
| 2| 3| 3|
| 1| 4| 4|
| 8| 4| 4|
+-----+------+----------+
我使用partitionBy时得到错误的结果,我该怎么办?谢谢!
我的预期输出是:
+-----+------+----------+
|index|values|valueDealy|
+-----+------+----------+
| 0| 1| 1|
| 1| 4| 1|
| 2| 3| 1|
| 3| 2| 4|
| 4| 5| 3|
| 5| 7| 2|
| 6| 3| 5|
| 7| 5| 7|
| 8| 4| 3|
| 9| 18| 5|
+-----+------+----------+
并且数据在多个分区中!
【问题讨论】:
-
您的预期输出是什么?
-
我已经改变了我的问题@RameshMaharjan
标签: apache-spark apache-spark-sql window-functions