【问题标题】:spark sql Find the number of extensions for a recordspark sql 查找记录的扩展数
【发布时间】:2022-01-24 22:19:55
【问题描述】:

我有一个如下的数据集

col1 extension_col1
2345 2246
2246 2134
2134 2091
2091 Null
1234 1111
1111 Null

我需要在col1 中找到每条记录可用的扩展数,从而记录已经按照以null 终止的集合连续排序。

最终结果如下

col1 extension_col1 No_Of_Extensions
2345 2246 3
2246 2134 2
2134 2091 1
2091 Null 0
1234 1111 1
1111 Null 0

值 2345 扩展为 2345>2246>2134>2091>null,因此它有 3 个扩展关系,不包括 null。

如何使用 spark sql/scala 获取第三列(No_Of_Extensions)?

【问题讨论】:

  • 不清楚,什么是“extension”?你如何订购你的数据框? (列col1?)
  • 可以做但是很繁琐。不是真正的火花使用。
  • @blackbishop 是的,按 col1 订购
  • 您可以接受和/或支持答案吗?新年快乐。

标签: sql apache-spark apache-spark-sql bigdata relation


【解决方案1】:

您可以使用一些窗口函数来实现。首先,使用extension_col1 上的累积条件和,创建一个组列grp。然后,在由grp 分区并由col1 排序的窗口上使用row_number 函数,但这次在升序时你会得到想要的结果:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  (Some(99985), Some(94904)), (Some(94904), Some(89884)),
  (Some(89884), Some(88592)), (Some(88592), Some(86367)),
  (Some(86367), Some(84121)), (Some(84121), None)
).toDF("col1", "extension_col1")

val w1 = Window.orderBy(desc("col1"))
val w2 = Window.partitionBy("grp").orderBy("col1")

val result = df.withColumn(
    "grp",
    sum(when(col("extension_col1").isNull, 1).otherwise(0)).over(w1)
).withColumn(
    "No_Of_Extensions",
    when(col("extension_col1").isNull, 0).otherwise(row_number().over(w2))
).drop("grp").orderBy(desc("col1"))

result.show
                        
//+-----+--------------+----------------+
//| col1|extension_col1|No_Of_Extensions|
//+-----+--------------+----------------+
//|99985|         94904|               5|
//|94904|         89884|               4|
//|89884|         88592|               3|
//|88592|         86367|               2|
//|86367|         84121|               1|
//|84121|          null|               0|
//+-----+--------------+----------------+

请注意,第一个sum 使用的是非分区窗口,因此所有数据将被移动到一个分区中,因此可能会影响性能。


Spark-SQL 等效查询:

SELECT col1, 
       extension_col1, 
       case when extension_col1 is null then 0 else row_number() over(partition by grp order by col1) end as No_Of_Extensions
FROM  (
      SELECT *, 
             sum(case when extension_col1 is null then 1 else 0 end) over(order by col1 desc) as grp
      FROM df
)
ORDER BY col1 desc

【讨论】:

  • 这没有按预期工作。对于少数记录,它的出现如下 /+----+--------------+----------------+ //| col1|extension_col1|No_Of_Extensions| //+----+--------------+----------------+ //|99985| 94904| 2| //|94904| 89884| 1| //|89884| 88592| 1| //|88592| 86367| 1| //|86367| 84121| 5| //| 84121 |空| 0| //+----+--------------+----------------+
  • @maverick9143 不知道你是怎么得到这个结果的,我只是用和你一样的数据测试了它,它给出了正确的结果。请使用完整的工作示例查看我的编辑。
  • 你能试试下面的数据集吗+-----+-----------+ | col1|extension_col1| +-----+--------------+ |99985| 94904| |94904| 89884| |89884| 88592| |88592| 86367| |86367| 84121| |84121|空| | 2345| 2246| | 2246| 2134| | 2134| 2091| | 2091|空| |94800| 89900| |89900|空| +-----+--------------+
  • 对于上面的 daatset 输出应该如下 | col1|extension_col1|No_Of_Extensions| |99985| 94904| 5| |94904| 89884| 4| |89884| 88592| 3| |88592| 86367| 2| |86367| 84121| 1| |84121|空| 0| |94800| 89900| 1| |89900|空| 0| | 2345| 2246| 3| | 2246| 2134| 2| | 2134| 2091| 1| | 2091|空| 0|
  • 但它的到来如下 | col1|extension_col1|No_Of_Extensions| +-----+--------------+----------------+ |99985| 94904| 3| |94904| 89884| 2| |94800| 89900| 1| |89900|空| 0| |89884| 88592| 3| |88592| 86367| 2| |86367| 84121| 1| |84121|空| 0| | 2345| 2246| 3| | 2246| 2134| 2| | 2134| 2091| 1| | 2091|空| 0|
【解决方案2】:

blackbishop 的替代方案,因为我假设数据可能不会 总是被订购,因此做一些替代处理。我喜欢条件求和,但在这里不适用。

老实说,Spark 的大规模用例很糟糕,因为我也无法解决单个问题 分区方面或者作为其他答案状态。但是在较新的 Spark 上增加了分区大小 在此示例中,版本和“列表”可能很长。

第 1 部分 - 生成数据

// 1. Generate data.
val df = Seq(( Some(2345), Some(22246) ), ( Some(22246), Some(2134) ), ( Some(2134), Some(2091) ), (Some(2091), None) ,
              ( Some(1234), Some(1111) ), ( Some(1111), None )
             ).toDF("col1" ,"extCol1")

第 2 部分 - 实际处理

//2. Narrow transform, add position in dataset as values nay not awlays be desc or asc.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rdd = df.rdd.zipWithIndex
val df2 = spark.createDataFrame(rdd.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)  // Some cost


//3. Make groupings in record ranges. Cannot avoid the single partition aspects, so this only works if we can do it with data that can fit into a single partition. At scale one would 
//   not be able to do this really unless some grouping characteristic. 
val dfg = df2.filter(df2("extCol1").isNull)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val winSpec1 = Window.orderBy(asc("rowid"))

val dfg2 = dfg.withColumn("prev_rowid_tmp", lag("rowid", 1, -1).over(winSpec1))
              .withColumn("rowidFrom", $"prev_rowid_tmp" + 1)
              .drop("prev_rowid_tmp")
              .drop("extCol1")
              .withColumnRenamed("rowid","rowidTo")

//4. Apply grouping of ranges of rows to data.
val df3 = df2.as("df2").join(dfg2.as("dfg2"), 
          $"df2.rowid" >= $"dfg2.rowidFrom" && $"df2.rowid" <= $"dfg2.rowidTo", "inner")             

//5. Do the calcs.
val res = df3.withColumn("numExtensions", $"rowidTo" - $"rowid") 
res.select("df2.col1", "extCol1", "numExtensions").show(false)

返回:

+-----+-------+-------------+
|col1 |extCol1|numExtensions|
+-----+-------+-------------+
|2345 |22246  |3            |
|22246|2134   |2            |
|2134 |2091   |1            |
|2091 |null   |0            |
|1234 |1111   |1            |
|1111 |null   |0            |
+-----+-------+-------------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-27
    • 2019-04-19
    • 2020-05-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多