【问题标题】:How to join two DataFrames and update missing values?如何加入两个 DataFrame 并更新缺失值?
【发布时间】:2017-09-14 08:22:46
【问题描述】:

我使用 Spark 2.0 并希望更新/合并 DataFrame 中的行值。

我有两个 DataFrame(旧的和新的),我想合并它们,当旧 DataFrame 的行数多于新 DataFrame 时,将旧数据值设置为 0。

案例 1 合并

旧数据框:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## +---+----+----+

新数据框:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bb|
## |  3|  cc|  cc|
## +---+----+----+

结果:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bb|
## |  3|  cc|  cc|
## +---+----+----+

案例 2 更新

旧数据框:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bb|
## |  3|  cc|  cc|
## +---+----+----+

新数据框:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## +---+----+----+

结果:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## |  3|  00|  00|
## +---+----+----+

键在两种情况下是唯一的,在实际情况下,DataFrame 可以有很多列。

如何编写 Spark/Scala 代码在一个函数中实现这两种情况?

【问题讨论】:

  • 您希望在每种情况下看到什么?
  • 如何编写scala&spark代码在一个函数中实现这两种情况?

标签: apache-spark dataframe apache-spark-sql


【解决方案1】:

诀窍是使用fullOuterJoinwhen 条件。

import org.apache.spark.sql.functions._

val dfa = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bb"),
  (3, "cc", "cc")).toDF("key", "val1", "val2")

val dfb = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bb")).toDF("key", "val1", "val2")

val q = dfa
  .join(dfb, Seq("key"), "outer")
  .select($"key",
     when(dfb("val1").isNull, lit(0)).otherwise(dfb("val1")).as("val1"), 
     when(dfb("val2").isNull, lit(0)).otherwise(dfb("val2")).as("val2"))
  .orderBy("key")

scala> q.show
+---+----+----+
|key|val1|val2|
+---+----+----+
|  1|  aa|  ab|
|  2|  bb|  bb|
|  3|   0|   0|
+---+----+----+

【讨论】:

  • 感谢您的回答!在实际情况下,数据框有很多列,有没有简单的方法来实现这一点?
  • 不管怎样,我觉得你的回答可以解决我的问题,过段时间我会测试一下,非常感谢!
  • 您可以创建一个表达式并将其分解。
  • 我用的是spark2.0,找不到“when”和“lit”功能。 @Alberto Bonsanto
  • 考虑使用join(...).na.fill() 填充空值。
【解决方案2】:

就像@summerbulb 在the comment 中建议的那样,您应该使用na 运算符来填充缺失值。

请注意,我使用 as 运算符为列设置别名。

val oldDF = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bb"),
  (3, "cc", "cc")).toDF("key", "val1", "val2")
val newDF = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bc")).toDF("key", "val1", "val2")
val q = oldDF.join(newDF.as("new"), Seq("key"), "outer")
  .select("key", "new.*")
  .na.fill("0")  // <-- na.fill("0") because of String type
  .orderBy("key")

scala> q.show
+---+----+----+
|key|val1|val2|
+---+----+----+
|  1|  aa|  ab|
|  2|  bb|  bc|
|  3|   0|   0|
+---+----+----+

根据列的类型,您可能希望将0 用作StringDouble 类型。

the scaladoc of Dataset 中阅读asna

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-28
    • 2016-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多