【问题标题】:How to update column of spark dataframe based on the values of previous record如何根据先前记录的值更新火花数据框的列
【发布时间】:2018-08-12 05:11:53
【问题描述】:

我在 df 中有三列

Col1,col2,col3

X,x1,x2

Z,z1,z2

Y,

X,x3,x4

P,p1,p2

Q,q1,q2

Y

我想做以下事情 当 col1=x 时,存储 col2 和 col3 的值 并在 col1=y 时将这些列值分配给下一行 预期输出

X,x1,x2

Z,z1,z2

Y,x1,x2

X,x3,x4

P,p1,p2

Q,q1,q2

Y,x3,x4

任何帮助将不胜感激 注意:-spark 1.6

【问题讨论】:

  • 这在分布式框架中太难解决了。你怎么知道哪一个是谁的前一个X?
  • 即使我有这个疑问......但认为可能会有类似于滞后功能的东西

标签: scala apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

这是使用Window函数的一种方法,步骤如下:

  1. 添加行标识列(如果已经有则不需要)并将非键列(可能很多)合并为一个
  2. 使用条件空值创建 tmp1 并使用 last/rowsBetween 窗口函数创建 tmp2 以使用最后一个非空值回填
  3. colstmp2 有条件地创建newcols
  4. 使用foldLeftnewcols 扩展回各个列

请注意,此解决方案使用 Window 函数而不进行分区,因此可能不适用于大型数据集。

val df = Seq(
  ("X", "x1", "x2"),
  ("Z", "z1", "z2"),
  ("Y", "", ""),
  ("X", "x3", "x4"),
  ("P", "p1", "p2"),
  ("Q", "q1", "q2"),
  ("Y", "", "")
).toDF("col1", "col2", "col3")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val colList = df.columns.filter(_ != "col1")

val df2 = df.select($"col1", monotonically_increasing_id.as("id"),
  struct(colList.map(col): _*).as("cols")
)

val df3 = df2.
  withColumn( "tmp1", when($"col1" === "X", $"cols") ).
  withColumn( "tmp2", last("tmp1", ignoreNulls = true).over(
    Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)
  ) )

df3.show
// +----+---+-------+-------+-------+
// |col1| id|   cols|   tmp1|   tmp2|
// +----+---+-------+-------+-------+
// |   X|  0|[x1,x2]|[x1,x2]|[x1,x2]|
// |   Z|  1|[z1,z2]|   null|[x1,x2]|
// |   Y|  2|    [,]|   null|[x1,x2]|
// |   X|  3|[x3,x4]|[x3,x4]|[x3,x4]|
// |   P|  4|[p1,p2]|   null|[x3,x4]|
// |   Q|  5|[q1,q2]|   null|[x3,x4]|
// |   Y|  6|    [,]|   null|[x3,x4]|
// +----+---+-------+-------+-------+

val df4 = df3.withColumn( "newcols",
  when($"col1" === "Y", $"tmp2").otherwise($"cols")
).select($"col1", $"newcols")

df4.show
// +----+-------+
// |col1|newcols|
// +----+-------+
// |   X|[x1,x2]|
// |   Z|[z1,z2]|
// |   Y|[x1,x2]|
// |   X|[x3,x4]|
// |   P|[p1,p2]|
// |   Q|[q1,q2]|
// |   Y|[x3,x4]|
// +----+-------+

val dfResult = colList.foldLeft( df4 )(
  (accDF, c) => accDF.withColumn(c, df4(s"newcols.$c"))
).drop($"newcols")

dfResult.show
// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |   X|  x1|  x2|
// |   Z|  z1|  z2|
// |   Y|  x1|  x2|
// |   X|  x3|  x4|
// |   P|  p1|  p2|
// |   Q|  q1|  q2|
// |   Y|  x3|  x4|
// +----+----+----+

[更新]

对于 Spark 1.x,last(colName, ignoreNulls) 在 DataFrame API 中不可用。一种解决方法是恢复使用在 last() 方法中支持 ignore-null 的 Spark SQL:

df2.
  withColumn( "tmp1", when($"col1" === "X", $"cols") ).
  createOrReplaceTempView("df2table")
  // might need to use registerTempTable("df2table") instead

val df3 = spark.sqlContext.sql("""
  select col1, id, cols, tmp1, last(tmp1, true) over (
    order by id rows between unbounded preceding and current row
    ) as tmp2
  from df2table
""")

【讨论】:

  • 我得到一些异常“错误:重载方法值最后一个替代方案:”并且我正在从 last("tmp1",ignoreNulls = true) 中删除第二个参数,然后我得到这个异常“38 : 错误: 值 unboundedPreceding 不是对象 org.apache.spark.sql.expressions.Window"的成员"
  • 我使用的是 spark 1.6
  • 您可以将Window.unboundedPreceding 替换为Long.MinValue。不幸的是,last(colName, ignoreNulls) 在 Spark 2.0 之前不可用。请查看我的更新答案以了解解决方法。
  • 我仍然得到 org.apache.spark.sql.AnalysisException: 找不到窗口函数最后;如果我做错了什么,请告诉我
  • 我没有 Spark 1.6 环境,因此无法重现上述问题。要恢复使用 Spark SQL,请确保您 import sqlContext.implicits._。这是一个相关的SO link 供您参考。
【解决方案2】:

是的,有一个lag函数需要订购

import org.apache.spark.sql.expressions.Window.orderBy
import org.apache.spark.sql.functions.{coalesce, lag}

case class Temp(a: String, b: Option[String], c: Option[String])

val input = ss.createDataFrame(
  Seq(
    Temp("A", Some("a1"), Some("a2")),
    Temp("D", Some("d1"), Some("d2")),
    Temp("B", Some("b1"), Some("b2")),
    Temp("E", None, None),
    Temp("C", None, None)
  ))

+---+----+----+
|  a|   b|   c|
+---+----+----+
|  A|  a1|  a2|
|  D|  d1|  d2|
|  B|  b1|  b2|
|  E|null|null|
|  C|null|null|
+---+----+----+

val order = orderBy($"a")
input
  .withColumn("b", coalesce($"b", lag($"b", 1).over(order)))
  .withColumn("c", coalesce($"c", lag($"c", 1).over(order)))
  .show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  A| a1| a2|
|  B| b1| b2|
|  C| b1| b2|
|  D| d1| d2|
|  E| d1| d2|
+---+---+---+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-28
    • 1970-01-01
    • 1970-01-01
    • 2020-12-07
    • 1970-01-01
    • 1970-01-01
    • 2017-12-11
    • 1970-01-01
    相关资源
    最近更新 更多