【问题标题】:Find all rows in a dataframe that meet a certain criteria in another dataframe查找数据框中满足另一个数据框中特定条件的所有行
【发布时间】:2020-06-28 01:48:01
【问题描述】:

我有两个数据框如下:

df1(参考数据)

Tempe, AZ, USA
San Jose, CA, USA
Mountain View, CA, USA
New York, NY, USA

df2(用户输入的数据)

Tempe, AZ
Tempe, Arizona
San Jose, USA
San Jose, CA
Mountain View, CA

我想获得如下数据框 (df3):

-------------------------------------------
|Tempe, AZ, USA        | Tempe, Arizona   |
|Tempe, AZ, USA        | Tempe, AZ        |
|San Jose, CA, USA     | San Jose, CA     |
|San Jose, CA, USA     | San Jose, USA    |
|Mountain View, CA, USA| Mountain View, CA|
-------------------------------------------

我已经是用户定义函数了:

isSameAs(str1: String, str2:String): Boolean{
    ......
} 

接受两个字符串(用户输入的数据和参考数据)并告诉我它们是否匹配。

我只需要找到在 Scala Spark SQL 中实现 ma​​p 的正确方法,以便获得像 df3 这样的数据框。

【问题讨论】:

  • 你能分享两个数据框的架构吗?
  • 两个数据框都只是单列。

标签: sql scala apache-spark apache-spark-sql


【解决方案1】:

选项 1:您可以使用 UDF 作为连接表达式:

import org.apache.spark.sql.functions._
val isSameAsUdf = udf(isSameAs(_,_))
val result = df1.join(df2, isSameAsUdf(df1.col("address"), df2.col("address")))

这种方法的缺点是 Spark 在 df1df2 两个数据帧上执行笛卡尔积,然后过滤与连接条件不匹配的列(更多详细信息 here)。运行result.explain 打印

== Physical Plan ==
CartesianProduct UDF(address#4, address#10)
:- LocalTableScan [address#4]
+- LocalTableScan [address#10]

选项 2:为了避免笛卡尔积,将参考数据broadcast 作为标准 Scala 序列然后在另一个 UDF 中进行地址映射可能会更快:

val normalizedAddress: Seq[String] = //content of df2 as scala sequence
val broadcastSeq = spark.sparkContext.broadcast(normalizedAddress)

def toNormalizedAddress(str: String ): String = 
    broadcastSeq.value.find(isSameAs(_, str)).getOrElse("")
val toNormalizedAddressUdf = udf(toNormalizedAddress(_))

val result2 = df2.withColumn("NormalizedAddress", toNormalizedAddressUdf('address))

结果与选项 1 相同,但 result2.explain 打印

== Physical Plan ==
LocalTableScan [address#10, NormalizedAddress#40]

如果参考数据量小到可以广播,则第二个选项有效。根据集群的硬件,大约 10.000 行的参考数据仍然被认为是很小的。

【讨论】:

  • 最初我有一个笛卡尔积,它运行良好。但是随着数据的增加,它并没有在我们拥有的机器上扩展。对于选项 2:如果给定的用户输入没有 normalizedAddress 是否有办法不成行,即我们想忽略无法规范化的用户输入地址。
  • toNormalizedAddress的定义中,当调用getOrElse在引用数据中没有找到值时设置默认值。在我的代码中,此值设置为空字符串。但是可以将其设置为任何其他值(可能是NOT_FOUND),然后过滤掉这些行。
  • 如果我在过滤之后进行过滤,我是否仍然要在过滤之前处理非常大的数据集?
  • 数据帧df2result 具有相同的行数。 withColumn 的执行只为每一行添加了一个额外的字段。行数保持不变。
【解决方案2】:

假设下面的架构(地址:字符串),试试这个-

加载数据

  val data1 =
      """Tempe, AZ, USA
        |San Jose, CA, USA
        |Mountain View, CA, USA""".stripMargin
    val df1 = data1.split(System.lineSeparator()).toSeq.toDF("address")
    df1.show(false)
    /**
      * +----------------------+
      * |address               |
      * +----------------------+
      * |Tempe, AZ, USA        |
      * |San Jose, CA, USA     |
      * |Mountain View, CA, USA|
      * +----------------------+
      */

    val data2 =
      """Tempe, AZ
        |Tempe, Arizona
        |San Jose, USA
        |San Jose, CA
        |Mountain View, CA""".stripMargin

    val df2 = data2.split(System.lineSeparator()).toSeq.toDF("address")
    df2.show(false)

    /**
      * +-----------------+
      * |address          |
      * +-----------------+
      * |Tempe, AZ        |
      * |Tempe, Arizona   |
      * |San Jose, USA    |
      * |San Jose, CA     |
      * |Mountain View, CA|
      * +-----------------+
      */

提取加入密钥,并据此加入


    df1.withColumn("joiningKey", substring_index($"address", ",", 1))
      .join(
        df2.withColumn("joiningKey", substring_index($"address", ",", 1)),
        "joiningKey"
      )
      .select(df1("address"), df2("address"))
      .show(false)

    /**
      * +----------------------+-----------------+
      * |address               |address          |
      * +----------------------+-----------------+
      * |Tempe, AZ, USA        |Tempe, AZ        |
      * |Tempe, AZ, USA        |Tempe, Arizona   |
      * |San Jose, CA, USA     |San Jose, USA    |
      * |San Jose, CA, USA     |San Jose, CA     |
      * |Mountain View, CA, USA|Mountain View, CA|
      * +----------------------+-----------------+
      */

【讨论】:

  • 嗨。我没有听从你的回答。我应该在哪里使用我的 isSameAs 函数?
  • 我又添加了一列joiningKey,该列源自数据框的原始地址列。 Spark 将在内部将来自 Df1 的 joiningKey 列与来自 Df2 的同一列连接起来。我不认为你需要一个方法 is SameAs 这里因为 spark 会在内部为你做这件事
  • @SomeshwarKale 实际上您只是在比较地址字符串的第一个 , 之前的部分,对吗?
  • @werner,是的。但这就是预期的输出,不是吗?
  • @SomeshwarKale IsSameAs 是一个复杂的逻辑,它考虑的不仅仅是“,”的位置。所以我需要使用它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-07-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-03-23
  • 2018-08-29
  • 2016-09-01
相关资源
最近更新 更多