【问题标题】:scala - Spark : How to union all dataframe in loopscala - Spark:如何在循环中联合所有数据帧
【发布时间】:2017-09-15 08:00:24
【问题描述】:

有没有办法在循环中获取联合数据帧的数据帧?

这是一个示例代码:

var fruits = List(
  "apple"
  ,"orange"
  ,"melon"
) 

for (x <- fruits){         
  var df = Seq(("aaa","bbb",x)).toDF("aCol","bCol","name")
}

我想得到一些这样的:

aCol | bCol | fruitsName
aaa,bbb,apple
aaa,bbb,orange
aaa,bbb,melon

再次感谢

【问题讨论】:

  • 这个代码是什么?你到底想在这里做什么?
  • 这不是联合,这是笛卡尔积。

标签: scala apache-spark


【解决方案1】:

嗯...我认为你的问题有点误导。

根据我对你正在尝试做的任何事情的有限理解,你应该做以下事情,

val fruits = List(
  "apple",
  "orange",
  "melon"
)

val df = fruits
  .map(x => ("aaa", "bbb", x))
  .toDF("aCol", "bCol", "name")

这应该足够了。

【讨论】:

  • 谢谢Sarvesh..但我只需要在Loop..中获取联合数据框。因为有各种操作,例如join,withColumn in Loop。我将在 Loop 中从 hiveSql 中获取数据帧。
  • "union data-frame in loop" 好吧……这句话让我无法回答这个问题。为什么需要这个“循环中的联合数据帧”?您能否在您的问题中详细说明 - “各种操作,例如加入,withColumn in Loop”。
【解决方案2】:

您可以先创建一个序列,然后使用toDF创建Dataframe

scala> var dseq : Seq[(String,String,String)] = Seq[(String,String,String)]()
dseq: Seq[(String, String, String)] = List()

scala> for ( x <- fruits){
     |  dseq = dseq :+ ("aaa","bbb",x)
     | }

scala> dseq
res2: Seq[(String, String, String)] = List((aaa,bbb,apple), (aaa,bbb,orange), (aaa,bbb,melon))

scala> val df = dseq.toDF("aCol","bCol","name")
df: org.apache.spark.sql.DataFrame = [aCol: string, bCol: string, name: string]

scala> df.show
+----+----+------+
|aCol|bCol|  name|
+----+----+------+
| aaa| bbb| apple|
| aaa| bbb|orange|
| aaa| bbb| melon|
+----+----+------+

【讨论】:

  • 为什么你觉得有必要在这里介绍var
  • 实际上我尝试的是创建一个Seq 并将其转换为数据框,因为我正在遍历水果列表并将其附加到同一个变量中,所以我将其视为var.
  • OP 使用了var,但他实际上并不需要它。而且,您可以只将mappedfruits 放入您的dseq。这里要注意的重要一点是您的dseqList。然后你在for“循环”中附加到这个列表。这样做的问题是,List 上的 appendO(n) 使你的整个 dseq 生成 O(n^2),这只会扼杀大数据的性能。
  • 将避免append 与Scala List 一起作为一般原则。
  • 谢谢@SarveshKumarSingh。
【解决方案3】:

在for循环中:

val fruits = List("apple", "orange", "melon")

( for(f <- fruits) yield ("aaa", "bbb", f) ).toDF("aCol", "bCol", "name")

【讨论】:

    【解决方案4】:

    Steffen Schmitz 的回答是我认为最简洁的回答。 如果您正在寻找更多自定义(字段类型等),以下是更详细的答案:

    import org.apache.spark.sql.types.{StructType, StructField, StringType}
    import org.apache.spark.sql.Row
    
    //initialize DF
    val schema = StructType(
      StructField("aCol", StringType, true) ::
      StructField("bCol", StringType, true) ::
      StructField("name", StringType, true) :: Nil)
    var initialDF = spark.createDataFrame(sc.emptyRDD[Row], schema)
    
    //list to iterate through
    var fruits = List(
        "apple"
        ,"orange"
        ,"melon"
    )
    
    for (x <- fruits) {
      //union returns a new dataset
      initialDF = initialDF.union(Seq(("aaa", "bbb", x)).toDF)
    }
    
    //initialDF.show()
    

    参考:

    【讨论】:

      【解决方案5】:

      您可以创建一个DataFrames 序列,然后使用reduce

      val results = fruits.
        map(fruit => Seq(("aaa", "bbb", fruit)).toDF("aCol","bCol","name")).
        reduce(_.union(_))
      
      results.show()
      

      【讨论】:

      • 简单漂亮!
      • 很高兴看到不可变的方法
      【解决方案6】:

      如果您有不同/多个数据帧,您可以使用下面的代码,这是有效的。

      val newDFs = Seq(DF1,DF2,DF3)
      newDFs.reduce(_ union _)
      

      【讨论】:

      • 如何继续使用循环向 Seq 添加新的数据帧?我想在最后做一个联合,但我的 Seq 中的数据框将使用循环添加。可行吗?
      • 为什么这样高效?如果你在 Scala Seq 上应用 reduce 函数,你根本就没有使用集群并行性和分布式计算,对吧?
      猜你喜欢
      • 1970-01-01
      • 2017-08-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-21
      • 1970-01-01
      相关资源
      最近更新 更多