【问题标题】:append multiple columns to existing dataframe in spark将多列附加到火花中的现有数据框
【发布时间】:2019-08-30 08:22:09
【问题描述】:

我需要将多个列附加到现有的 spark 数据框中,其中列名在 List 中给出 假设新列的值是恒定的,例如给定的输入列和数据框是

val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))

在附加两列之后,假设 col1 的常量值为“val1”,col2 的值为“val2”,输出数据帧应该是

+-----+---+-------+------+
|   _1| _2|col1   |col2|
+-----+---+-------+------+
|  one|  1|val1   |val2|
|  two|  2|val1   |val2|
|three|  3|val1   |val2|
| four|  4|val1   |val2|
+-----+---+-------+------+

我写了一个函数来追加列

def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {

            cols match {

                case Nil => ds
                case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
                case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))

            }
        }

有没有更好的方法和更实用的方法来做到这一点。

谢谢

【问题讨论】:

  • 澄清一下,在appendColumns 中,列名与列值相同,而在预期的输出数据框中,值例如col1val1,可以相同(列名和值)还是要分开?
  • 列名和列值可以相同。
  • 奇数关闭原因。
  • 您好,您找到问题的答案了吗?还是有什么不清楚的地方?
  • 感谢 Oli,是的,建议的方法非常好。

标签: scala apache-spark apache-spark-sql bigdata


【解决方案1】:

是的,有一种更好更简单的方法。基本上,您对withColumn 的调用次数与您拥有的列数一样多。有很多列,催化剂,优化火花查询的引擎可能会感觉有点不知所措(我过去有过类似用例的经验)。我什至看到它在尝试数千列时会导致驱动程序出现 OOM。为了避免给催化剂带来压力(并编写更少的代码;-)),您可以像下面这样简单地使用select 在一个 spark 命令中完成此操作:

val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns + the new ones
data.select(data.columns.map(col) ++ newCols : _*).show
+-----+---+----+----+
|   _1| _2|col1|col2|
+-----+---+----+----+
|  one|  1|val1|val2|
|  two|  2|val1|val2|
|three|  3|val1|val2|
| four|  4|val1|val2|
+-----+---+----+----+

【讨论】:

    【解决方案2】:

    与递归相反,我认为使用 foldLeft 的更通用方法对于有限数量的列更通用。使用 Databricks 笔记本:

    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    
    import spark.implicits._
    
    val columnNames = Seq("c3","c4")
    val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")
    
    def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
        columns.foldLeft(df)((acc, col) => {
          acc.withColumn(col, lit(col)) })
    }
    
    val df2 = addCols(df, columnNames)
    df2.show(false)
    

    返回:

    +-----+---+---+---+
    |c1   |c2 |c3 |c4 |
    +-----+---+---+---+
    |one  |1  |c3 |c4 |
    |two  |2  |c3 |c4 |
    |three|3  |c3 |c4 |
    |four |4  |c3 |c4 |
    +-----+---+---+---+
    

    请注意以下内容:https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 尽管上下文略有不同,而另一个答案通过 select 方法暗示了这一点。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-10-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-11
      • 1970-01-01
      相关资源
      最近更新 更多