【问题标题】:Doing left outer join on multiple data frames in spark scala在 spark scala 中对多个数据帧进行左外连接
【发布时间】:2018-02-02 22:57:48
【问题描述】:

我是 Spark 的新手。我尝试使用 scala 实现以下用例。

-DataFrame 1

| col A | col B |
-----------------
|  1    | a     |
|  2    | a     |
|  3    | a     |

-DataFrame 2

| col A | col B |
-----------------
|  1    | b     |
|  3    | b     |

-DataFrame 3

| col A | col B |
-----------------
|  2    | c     |
|  3    | c     |

最终输出帧应该是

| col A | col B |
-----------------
|  1    | a,b   |
|  2    | a,c   |
|  3    | a,b,c |

帧数不限于 3,它可以是小于 100 的任何数字。所以我正在使用每个我正在打印的每个数据帧。

有人可以帮我如何创建最终的数据框,在其中我可以用 N 个数据框以上述格式输出。

感谢您的帮助。

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    我今天看到了这个问题。我建议你使用python来解决它。它比 scala 更容易编写。他们是:

    from pyspark.sql import SQLContext
    
    from pyspark.sql.functions import concat_ws
    
    d1=sc.parallelize([(1, "a"), (2, "a"), (3,"a")]).toDF().toDF("Col_A","Col_B")
    
    d2=sc.parallelize([(1, "b"), (2, "b")]).toDF().toDF("Col_A", "Col_B")
    
    d3=sc.parallelize([(2, "c"), (3, "c")]).toDF().toDF("Col_A", "Col_B")
    
    d4=d1.join(d2,'Col_A','left').join(d3,'Col_A','left').select(d1.Col_A.alias("col A"),concat_ws(',',d1.Col_B,d2.Col_B,d3.Col_B).alias("col B"))
    
    df4.show()
    
    +-----+-----+
    
    |col A|col B|
    
    +-----+-----+
    
    |    1
    |  a,b|
    
    |    2
    |a,b,c|
    
    |    3
    |  a,c|
    
    +-----+-----+
    

    你看到了结果!

    【讨论】:

    • 感谢您的意见。
    【解决方案2】:

    您可以使用foldLeft 使用外连接迭代合并数据

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.functions._
    
    val df1 = Seq((1, "a"), (2, "a"), (3, "a")).toDF("Col A", "Col B")
    val df2 = Seq((1, "b"), (2, "b")).toDF("Col A", "Col B")
    val df3 = Seq((2, "c"), (3, "c")).toDF("Col A", "Col B")
    
    val dfs = Seq(df2, df3)
    val bs = (0 to dfs.size).map(i => s"Col B $i")
    
    dfs.foldLeft(df1)(
      (acc, df) => acc.join(df, Seq("Col A"), "fullouter")
    ).toDF("Col A" +: bs: _*).select($"Col A", array(bs map col: _*)).map {
      case Row(a: Int, bs: Seq[_]) => 
        // Drop nulls and concat
        (a, bs.filter(_ != null).map(_.toString).mkString(","))
    }.toDF("Col A", "Col B").show
    
    // +-----+-----+ 
    // |Col A|Col B|
    // +-----+-----+
    // |    1|  a,b|
    // |    3|  a,c|
    // |    2|a,b,c|
    // +-----+-----+
    

    但如果你真的认为

    可以是任何小于 100 的数字

    那就太不现实了。 join 是 Spark 中最昂贵的操作,即使对优化器进行了所有改进,它也无法正常工作。

    【讨论】:

    • 感谢您的帮助。
    猜你喜欢
    • 1970-01-01
    • 2021-08-17
    • 2019-01-03
    • 2017-03-18
    • 2021-12-12
    • 1970-01-01
    • 2019-01-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多