blackbishop 的替代方案,因为我假设数据可能不会
总是被订购,因此做一些替代处理。我喜欢条件求和,但在这里不适用。
老实说,Spark 的大规模用例很糟糕,因为我也无法解决单个问题
分区方面或者作为其他答案状态。但是在较新的 Spark 上增加了分区大小
在此示例中,版本和“列表”可能很长。
第 1 部分 - 生成数据
// 1. Generate data.
val df = Seq(( Some(2345), Some(22246) ), ( Some(22246), Some(2134) ), ( Some(2134), Some(2091) ), (Some(2091), None) ,
( Some(1234), Some(1111) ), ( Some(1111), None )
).toDF("col1" ,"extCol1")
第 2 部分 - 实际处理
//2. Narrow transform, add position in dataset as values nay not awlays be desc or asc.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rdd = df.rdd.zipWithIndex
val df2 = spark.createDataFrame(rdd.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) // Some cost
//3. Make groupings in record ranges. Cannot avoid the single partition aspects, so this only works if we can do it with data that can fit into a single partition. At scale one would
// not be able to do this really unless some grouping characteristic.
val dfg = df2.filter(df2("extCol1").isNull)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val winSpec1 = Window.orderBy(asc("rowid"))
val dfg2 = dfg.withColumn("prev_rowid_tmp", lag("rowid", 1, -1).over(winSpec1))
.withColumn("rowidFrom", $"prev_rowid_tmp" + 1)
.drop("prev_rowid_tmp")
.drop("extCol1")
.withColumnRenamed("rowid","rowidTo")
//4. Apply grouping of ranges of rows to data.
val df3 = df2.as("df2").join(dfg2.as("dfg2"),
$"df2.rowid" >= $"dfg2.rowidFrom" && $"df2.rowid" <= $"dfg2.rowidTo", "inner")
//5. Do the calcs.
val res = df3.withColumn("numExtensions", $"rowidTo" - $"rowid")
res.select("df2.col1", "extCol1", "numExtensions").show(false)
返回:
+-----+-------+-------------+
|col1 |extCol1|numExtensions|
+-----+-------+-------------+
|2345 |22246 |3 |
|22246|2134 |2 |
|2134 |2091 |1 |
|2091 |null |0 |
|1234 |1111 |1 |
|1111 |null |0 |
+-----+-------+-------------+