【问题标题】:Spark dataframe inner join without duplicate match没有重复匹配的 Spark 数据帧内部连接
【发布时间】:2020-04-18 20:12:17
【问题描述】:

我想根据特定条件加入两个数据帧是 spark scala。但是,如果 df1 中的行与 df2 中的任何行匹配,则不应尝试将 df1 的同一行与 df2 中的任何其他行匹配。以下是我试图获得的示例数据和结果。

   DF1
--------------------------------
Emp_id | Emp_Name | Address_id
1      |  ABC     |   1
2      |  DEF     |   2
3      |  PQR     |   3
4      |  XYZ     |   1

   DF2
-----------------------
Address_id | City 
1          | City_1
1          | City_2
2          | City_3
REST       | Some_City

  Output DF
----------------------------------------
Emp_id | Emp_Name | Address_id | City
1      |  ABC     |   1        | City_1
2      |  DEF     |   2        | City_3
3      |  PQR     |   3        | Some_City
4      |  XYZ     |   1        | City_1 

注意:- REST 就像通配符。任何值都可以等于 REST。

所以在上面的示例中,emp_name "ABC" 可以与 City_1、City_2 或 Some_City 匹配。输出 DF 仅包含 City_1,因为它首先找到它。

【问题讨论】:

  • 让问题。 DF2 中 Address_id = 1 的 City_1 和 Address_id = 1 的 City_2 是否正确?

标签: apache-spark-sql


【解决方案1】:

您的加入似乎有一个自定义逻辑。基本上我一直想出下面的UDF

请注意,您可能希望根据您的要求更改 UDF 的逻辑。

import spark.implicits._
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.first

//dataframe 1    
val df_1 = Seq(("1", "ABC", "1"), ("2", "DEF", "2"), ("3", "PQR", "3"), ("4", "XYZ", "1")).toDF("Emp_Id", "Emp_Name", "Address_Id")

//dataframe 2
val df_2 = Seq(("1", "City_1"), ("1", "City_2"), ("2", "City_3"), ("REST","Some_City")).toDF("Address_Id", "City_Name")

// UDF logic
val join_udf = udf((a: String, b: String) => {
      (a,b) match {
        case ("1", "1") => true
        case ("1", _) => false
        case ("2", "2") => true
        case ("2", _) => false
        case(_, "REST") => true
        case(_, _) => false

    }})

val dataframe_join = df_1.join(df_2, join_udf(df_1("Address_Id"), df_2("Address_Id")), "inner").drop(df_2("Address_Id"))
                             .orderBy($"City_Name")
                             .groupBy($"Emp_Id", $"Emp_Name", $"Address_Id")
                             .agg(first($"City_Name"))
                             .orderBy($"Emp_Id")

dataframe_join.show(false)

基本上在应用 UDF 后,你得到的是所有可能的匹配组合。

发布当您应用 groupBy 并使用 aggfirst 功能时,您只会得到过滤后的值作为您正在寻找的值。

+------+--------+----------+-----------------------+
|Emp_Id|Emp_Name|Address_Id|first(City_Name, false)|
+------+--------+----------+-----------------------+
|1     |ABC     |1         |City_1                 |
|2     |DEF     |2         |City_3                 |
|3     |PQR     |3         |Some_City              |
|4     |XYZ     |1         |City_1                 |
+------+--------+----------+-----------------------+

请注意,我使用的是 Spark 2.3,希望对您有所帮助!

【讨论】:

  • 我对迟到的回复首先道歉。这个答案对我帮助很大。
  • @PrateekPathak 没问题,我很乐意提供帮助。也请随意对答案进行投票;)
  • 赞成答案(不确定赞成的词是否正确)。感谢您让我知道这一点,因为我是新用户。
【解决方案2】:
{    
    import org.apache.spark.sql.{SparkSession}
    import org.apache.spark.sql.functions._

    object JoinTwoDataFrame extends App {

      val spark = SparkSession.builder()
        .master("local")
        .appName("DataFrame-example")
        .getOrCreate()

      import spark.implicits._

      val df1 = Seq(
        (1, "ABC", "1"),
        (2, "DEF", "2"),
        (3, "PQR", "3"),
        (4, "XYZ", "1")
      ).toDF("Emp_id", "Emp_Name", "Address_id")

      val df2 = Seq(
        ("1", "City_1"),
        ("1", "City_2"),
        ("2", "City_3"),
        ("REST", "Some_City")
      ).toDF("Address_id", "City")

      val restCity: Option[String] = Some(df2.filter('Address_id.equalTo("REST")).select('City).first()(0).toString)

      val res = df1.join(df2, df1.col("Address_id") === df2.col("Address_id") , "left_outer")
        .select(
          df1.col("Emp_id"),
          df1.col("Emp_Name"),
          df1.col("Address_id"),
          df2.col("City")
        )
          .withColumn("city2", when('City.isNotNull, 'City).otherwise(restCity.getOrElse("")))
          .drop("City")
          .withColumnRenamed("city2", "City")
          .orderBy("Address_id", "City")
          .groupBy("Emp_id", "Emp_Name", "Address_id")
          .agg(collect_list("City").alias("cityList"))
          .withColumn("City", 'cityList.getItem(0))
          .drop("cityList")
          .orderBy("Emp_id")

            res.show(false)

    //  +------+--------+----------+---------+
    //  |Emp_id|Emp_Name|Address_id|City     |
    //  +------+--------+----------+---------+
    //  |1     |ABC     |1         |City_1   |
    //  |2     |DEF     |2         |City_3   |
    //  |3     |PQR     |3         |Some_City|
    //  |4     |XYZ     |1         |City_1   |
    //  +------+--------+----------+---------+

    }
}

【讨论】:

  • 我无法获取过滤“REST”记录的逻辑。请解释一下。
猜你喜欢
  • 2019-01-02
  • 2020-03-21
  • 1970-01-01
  • 1970-01-01
  • 2022-11-01
  • 1970-01-01
  • 2014-02-14
  • 1970-01-01
  • 2020-08-11
相关资源
最近更新 更多