【问题标题】:perform join on multiple DataFrame in spark在 Spark 中对多个 DataFrame 执行连接
【发布时间】:2017-02-19 22:34:41
【问题描述】:

我有 3 个从 3 个不同进程生成的数据帧。 每个数据框都有同名的列。 我的数据框看起来像这样

id   val1    val2       val3    val4
 1    null   null       null    null
 2    A2      A21       A31      A41

id   val1      val2       val3      val4
 1    B1        B21        B31       B41
 2    null      null       null      null

id   val1     val2       val3    val4
 1    C1       C2        C3       C4
 2    C11      C12       C13      C14

在这 3 个数据帧中,我想创建两个数据帧(最终的和合并的)。 最后,偏好顺序 - 数据帧 1 > 数据帧 2 > 数据帧 3

如果数据帧 1 中有结果(val1 != null),我会将该行存储在最终数据帧中。

我的最终结果应该是:

id  finalVal1    finalVal2   finalVal3   finalVal4 
1     B1           B21         B31         B41
2     A2           A21         A31         A41

Consolidated Dataframe 将存储所有 3 个的结果。

我怎样才能有效地做到这一点?

【问题讨论】:

  • 您能否使用上述数据框提供预期的输出?
  • 需要添加最终输出
  • 您没有null 或整行是否正确?行为空时,数据集中的id是吗?
  • 是的..我可以有空行或整行..当整行为空时,ID 将在数据集中
  • 对不起,我不清楚(你说是,但你的说法与我的矛盾)。像1 B1 null B31 B41 这样的行存在吗?在这种情况下,你是拒绝整行还是接受非空值?

标签: scala join apache-spark


【解决方案1】:

如果我理解正确的话,对于每一行,你想找出第一个非空值,首先查看第一个表,然后是第二个表,然后是第三个表。

你只需要根据id将这三个表连接起来,然后使用coalesce函数得到第一个非空元素

import org.apache.spark.sql.functions._

val df1 = sc.parallelize(Seq(
    (1,null,null,null,null),
    (2,"A2","A21","A31", "A41"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df2 = sc.parallelize(Seq(
    (1,"B1","B21","B31", "B41"),
    (2,null,null,null,null))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df3 = sc.parallelize(Seq(
    (1,"C1","C2","C3","C4"),
    (2,"C11","C12","C13", "C14"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val consolidated = df1.join(df2, "id").join(df3, "id").select(
  df1("id"),
  coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
  coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
  coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
  coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)

这会给你预期的输出

+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

【讨论】:

  • 如果所有表中的值都为空,它会起作用。如果所有 3 中的值都为 null,我想得到 null?
  • 是的,coalesce 返回第一个非空值,如果一切都为 null,则返回 null
  • consolidated = df1.join(df2, "id").join(df3, "id") 我已经看到了这个解决方案,它似乎非常适合多重连接。是否可以为 N 数量的表制作它。因为在我的情况下,我有一个需要加入的数据集列表,但我不知道列表中有多少数据集(来自用户)。有没有可能让它更通用?
【解决方案2】:

编辑:带有部分空行的新解决方案。它避免了连接,但使用了窗口函数和不同的...

case class a(id:Int,val1:String,val2:String,val3:String,val4:String)

val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()

val df2 = sc.parallelize(List(
a(1,"B1",null,"B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()

val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()

val anyNotNull = df1.columns.tail.map(c => col(c).isNotNull).reduce(_ || _)

val consolidated = {
  df1
    .filter(anyNotNull)
    .withColumn("foo",lit(1))
    .unionAll(df2.filter(anyNotNull).withColumn("foo",lit(2)))
    .unionAll(df3.filter(anyNotNull).withColumn("foo",lit(3)))
}

scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|                                                                                                                                                                                                                                                    
+---+----+----+----+----+
|  1|  B1|null| B31| B41|
|  1|  B1|  C2| B31| B41|
|  3| C11| C12| C13| C14|
|  2|  A2| A21| A31| A41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

val w = Window.partitionBy('id).orderBy('foo)

val coalesced = col("id") +: df1.columns.tail.map(c => first(col(c),true).over(w).as(c))
val finalDF = consolidated.select(coalesced:_*).na.drop.distinct

scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1|  C2| B31| B41|
|  3| C11| C12| C13| C14|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

旧解决方案:

如果您只有完整的 null 行或根本没有 null,您可以这样做(编辑:与其他解决方案相比,优势在于您避免了不同的)

数据:

case class a(id:Int,val1:String,val2:String,val3:String,val4:String)

val df1 = sc.parallelize(List(
a(1,null,null,null,null),
a(2,"A2","A21","A31","A41"),
a(3,null,null,null,null))).toDF()
val df2 = sc.parallelize(List(
a(1,"B1","B21","B31","B41"),
a(2,null,null,null,null),
a(3,null,null,null,null))).toDF()
val df3 = sc.parallelize(List(
a(1,"C1","C2","C3","C4"),
a(2,"C11","C12","C13","C14"),
a(3,"C11","C12","C13","C14"))).toDF()

综合:

val consolidated = {
  df1.na.drop.withColumn("foo",lit(1))
  .unionAll(df2.na.drop.withColumn("foo",lit(2)))
  .unionAll(df3.na.drop.withColumn("foo",lit(3)))
}

scala> consolidated.show()
+---+----+----+----+----+---+
| id|val1|val2|val3|val4|foo|
+---+----+----+----+----+---+
|  2|  A2| A21| A31| A41|  1|
|  1|  B1| B21| B31| B41|  2|
|  1|  C1|  C2|  C3|  C4|  3|
|  2| C11| C12| C13| C14|  3|
|  3| C11| C12| C13| C14|  3|
+---+----+----+----+----+---+

决赛

val w = Window.partitionBy('id).orderBy('foo)
val finalDF = consolidated
  .withColumn("foo2",rank().over(w))
  .filter('foo2===1)
  .drop("foo").drop("foo2")

scala> finalDF.show()
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  3| C11| C12| C13| C14|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+

【讨论】:

  • 您的解决方案太复杂了,OP 在他的示例中没有第三行,令人困惑
  • 我没有看到提高测试示例覆盖率的问题。 OP 要求一个高性能的解决方案,而不是一个简单的解决方案。
  • 令人困惑。您认为使用窗口函数、distinct、filter、drop、map/reduce、case class 等比使用 coalesce 连接更有效吗?
  • filter 和 na.drop 不需要随机播放,因此比连接快得多。 map-reduce 在非常小的集合(列)上执行,而且速度也非常快。管道连接创建了可怕的执行计划。我不明白你为什么主观地争论 stackoverflow(“这很混乱”)。
【解决方案3】:

以下是连接六个表/数据框的示例(不使用 SQL)

retail_db 是一个众所周知的示例数据库,任何人都可以从 Google 获得它

问题://获取所有购买健身物品的TX客户

 val df_customers = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "customers").option("user", "root").option("password", "root").load()
  val df_products = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products").option("user", "root").option("password", "root").load() 
  val df_orders = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "orders"). option("user", "root").option("password", "root").load()
  val df_order_items = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_items").option("user", "root").option("password", "root").load()
  val df_categories = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "categories").option("user", "root").option("password", "root").load()
  val df_departments = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "departments").option("user", "root").option("password", "root").load()
  val df_order_items_all = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/retail_db?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "order_all").option("user", "root").option("password", "root").load()



  val jeCustOrd=df_customers.col("customer_id")===df_orders.col("order_customer_id")
  val jeOrdItem=df_orders.col("order_id")===df_order_items.col("order_item_order_id")
  val jeProdOrdItem=df_products.col("product_id")===df_order_items.col("order_item_product_id")
  val jeProdCat=df_products.col("product_category_id")===df_categories.col("category_id")
  val jeCatDept=df_categories.col("category_department_id")===df_departments.col("department_id")





  df_customers.where("customer_state = 'TX'").join(df_orders,jeCustOrd).join(df_order_items,jeOrdItem).join(df_products,jeProdOrdItem).join(df_categories,jeProdCat).join(df_departments,jeCatDept).filter("department_name='Fitness'")
  .select("customer_id","customer_fname","customer_lname", "customer_street","customer_city","customer_state","customer_zipcode","order_id","category_name","department_name").show(5)

【讨论】:

    【解决方案4】:

    如果它们来自三个不同的表,我会使用下推过滤器在服务器上过滤它们并使用数据框之间的连接功能将它们连接在一起。

    如果它们不是来自数据库表;您可以使用过滤器并将高阶函数映射到相同的并行。

    【讨论】:

      猜你喜欢
      • 2023-03-16
      • 2016-02-28
      • 1970-01-01
      • 2020-11-27
      • 2018-10-18
      • 2015-10-05
      相关资源
      最近更新 更多