【问题标题】:Spark scala - Dataframes comparisonSpark scala - 数据帧比较
【发布时间】:2021-08-04 23:51:22
【问题描述】:

如何根据 PK 比较 2 个 Dataframe。

基本上想创建一个 scala spark 代码来比较 2 个大数据帧(每个 10M 记录,每个 100 列)并将输出显示为:

ID   Diff
1 [ {Col1: [1,2]}, {col3: [5,10]} ...]
2 [ {Col3: [4,2]}, {col7: [2,6]} ...]

ID是PK

差异列 - 显示第一个列名称差异在哪里,然后在该列中显示哪个值与另一个不同。

【问题讨论】:

  • 这不是一个简单的问题,但令人惊讶的是可行的。这只是我开始的粗略部分:val text = ds1.columns.filter(_ != "id").map(c => when(ds1(c) =!= ds2(c), concat_ws("", lit(s"{$c: ["), ds1(c), lit(","), ds2(c), lit("]}"))) as c)。不过,将它们放在一起需要很长时间。
  • 测试输出如下:{value: [foo,bar]}

标签: scala dataframe apache-spark


【解决方案1】:

可以将每个不同的列转换为字符串,然后将所有列连接起来:

// ---- data ---
val leftDF = Seq(
  (1, 1, 5, 0),
  (2, 0, 4, 2)
).toDF("ID", "Col1", "col3", "col7")
val rightDF = Seq(
  (1, 2, 10, 0),
  (2, 0, 2, 6)
).toDF("ID", "Col1", "col3", "col7")

def getDifferenceForColumn(name: String): Column =
  when(
    col("l." + name) =!= col("r." + name),
    concat(lit("{" + name + ": ["), col("l." + name), lit(","), col("r." + name), lit("]}")))
    .otherwise(lit(""))

val diffColumn = leftDF
  .columns
  .filter(_ != "ID")
  .map(name => getDifferenceForColumn(name))
  .reduce((l, r) => concat(l,
    when(length(r) =!= 0 && length(l) =!= 0, lit(",")).otherwise(lit(""))
    , r))


val diffColumnWithBraces = concat(lit("["), diffColumn, lit("]"))

leftDF
  .alias("l")
  .join(rightDF.alias("r"), Seq("id"))
  .select(col("ID"), diffColumnWithBraces.alias("DIFF"))

输出:

+---+------------------------------+
|ID |DIFF                          |
+---+------------------------------+
|1  |[{Col1: [1,2]},{col3: [5,10]}]|
|2  |[{col3: [4,2]},{col7: [2,6]}] |
+---+------------------------------+

如果列不能有值“}{”,在上面的解决方案中可以改变两个变量,也许性能会更好:

 val diffColumns = leftDF
      .columns
      .filter(_ != "ID")
      .map(name => getDifferenceForColumn(name))

val diffColumnWithBraces = concat(lit("["), regexp_replace(concat(diffColumns: _*),"\\}\\{","},{"), lit("]"))

【讨论】:

  • 当有大数据时 - 获得内存开销异常.. 有没有办法限制内存使用(减少内存使用)?
  • 类似的做法,可以减少最后添加的内存;也可以检查用UTF回答。
【解决方案2】:

也可以使用UDF,传入数据和输出和我第一个回答一样:

val colNames = leftDF
  .columns
  .filter(_ != "ID")

val generateSeqDiff = (colNames: Seq[String], leftValues: Seq[Any], rightValues: Seq[Any]) => {
  val nameValues = colNames
    .zip(leftValues)
    .zip(rightValues)
    .filterNot({ case ((_, l), r) => l == r })
    .map({ case ((name, l), r) => s"{$name: [$l,$r]}" })
    .mkString(",")
  s"[$nameValues]"
}
val generateSeqDiffUDF = udf(generateSeqDiff)

leftDF
  .select($"ID", array(colNames.head, colNames.tail: _*).alias("leftValues"))
  .alias("l")
  .join(
    rightDF
      .select($"ID", array(colNames.head, colNames.tail: _*).alias("rightValues"))
      .alias("r"), Seq("id"))
  .select($"ID", generateSeqDiffUDF(lit(colNames), $"leftValues", $"rightValues").alias("DIFF"))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-08-23
    • 2018-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多