【问题标题】:Derive multiple columns from a single column in a Spark DataFrame从 Spark DataFrame 中的单列派生多列
【发布时间】:2015-11-18 16:44:24
【问题描述】:

我有一个带有大量可解析元数据的 DF,作为 Dataframe 中的单个字符串列,我们称之为 DFA,使用 ColmnA。

我想通过一个函数 ClassXYZ = Func1(ColmnA) 将这一列 ColmnA 分成多个列。此函数返回一个类 ClassXYZ,其中包含多个变量,现在每个变量都必须映射到新的 Column,例如 ColmnA1、ColmnA2 等。

我将如何通过仅调用此 Func1 一次来使用这些附加列从 1 个 Dataframe 到另一个 Dataframe 进行这种转换,而不必重复它来创建所有列。

如果我每次都调用这个巨大的函数来添加一个新列,这很容易解决,但我希望避免这种情况。

请提供工作代码或伪代码。

谢谢

桑杰

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql user-defined-functions


    【解决方案1】:

    一般来说,你想要的不是直接可能的。 UDF 一次只能返回一列。有两种不同的方法可以克服这个限制:

    1. 返回复杂类型的列。最通用的解决方案是StructType,但您也可以考虑ArrayTypeMapType

      import org.apache.spark.sql.functions.udf
      
      val df = Seq(
        (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
      ).toDF("x", "y", "z")
      
      case class Foobar(foo: Double, bar: Double)
      
      val foobarUdf = udf((x: Long, y: Double, z: String) => 
        Foobar(x * y, z.head.toInt * y))
      
      val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
      df1.show
      // +---+----+---+------------+
      // |  x|   y|  z|      foobar|
      // +---+----+---+------------+
      // |  1| 3.0|  a| [3.0,291.0]|
      // |  2|-1.0|  b|[-2.0,-98.0]|
      // |  3| 0.0|  c|   [0.0,0.0]|
      // +---+----+---+------------+
      
      df1.printSchema
      // root
      //  |-- x: long (nullable = false)
      //  |-- y: double (nullable = false)
      //  |-- z: string (nullable = true)
      //  |-- foobar: struct (nullable = true)
      //  |    |-- foo: double (nullable = false)
      //  |    |-- bar: double (nullable = false)
      

      这可以很容易地在以后变平,但通常不需要。

    2. 切换到RDD,重塑和重建DF:

      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      
      def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
        Seq(x * y, z.head.toInt * y)
      
      val schema = StructType(df.schema.fields ++
        Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
      
      val rows = df.rdd.map(r => Row.fromSeq(
        r.toSeq ++
        foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
      
      val df2 = sqlContext.createDataFrame(rows, schema)
      
      df2.show
      // +---+----+---+----+-----+
      // |  x|   y|  z| foo|  bar|
      // +---+----+---+----+-----+
      // |  1| 3.0|  a| 3.0|291.0|
      // |  2|-1.0|  b|-2.0|-98.0|
      // |  3| 0.0|  c| 0.0|  0.0|
      // +---+----+---+----+-----+
      

    【讨论】:

    • 当你说“通常没有 [flattening a column]”时,这是为什么呢?或者,Spark 是否允许您对顶级列执行的大多数操作也可以使用分层数据完成(例如 df1.foobar.foo)?
    • @max 因为简单的structs 几乎可以在任何通常使用扁平结构的上下文中使用(使用简单的点语法fooobar.foo)。但它不适用于集合类型。也可以查看stackoverflow.com/a/33850490/1560062
    • 您可以在分配给数据框列时尝试不同的方法,使用示例中的“withColumn”是上面的:val df1 = df.withColumn("foo", foobarUdf($"x" , $"y", $"z").getField("foo")).withColumn("bar", foobarUdf($"x", $"y", $"z").getField("bar") ) 现在,模式有 2 个新列:“foo”和“bar”。
    【解决方案2】:

    假设在你的函数之后会有一个元素序列,举个例子如下:

    val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
    df.show
    +------------------+---+
    |          infoComb|age|
    +------------------+---+
    |Mike,1986,Toronto| 30|
    | Andre,1980,Ottawa| 36|
    |  jill,1989,London| 27|
    +------------------+---+
    

    现在你可以用这个 infoComb 做的是你可以开始拆分字符串并获得更多列:

    df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
    +-----+----------+-------+---+
    | name|yearOfBorn|   city|age|
    +-----+----------+-------+---+
    |Mike|      1986|Toronto| 30|
    |Andre|      1980| Ottawa| 36|
    | jill|      1989| London| 27|
    +-----+----------+-------+---+
    

    希望这会有所帮助。

    【讨论】:

    • 你不能直接说 df.select('infoComb.*', 'age') 列名上的 .* 选择结构中的每个字段作为新列。
    【解决方案3】:

    如果生成的列与原始列的长度相同,则可以使用 withColumn 函数并应用 udf 创建全新的列。在此之后,您可以删除原始列,例如:

     val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
    .withColumn("newCol2", myFun2(myDf("originalColumn"))
    .drop(myDf("originalColumn"))
    

    其中 myFun 是这样定义的 udf:

       def myFun= udf(
        (originalColumnContent : String) =>  {
          // do something with your original column content and return a new one
        }
      )
    

    【讨论】:

    • 嗨 Niemand,感谢您的回复...但它不能解决问题...在您的代码中,您多次调用函数“myDF”,而我希望该函数被调用一次,生成一个有多个字段的类,每个字段变量作为一个新的列返回
    • 好吧,我害怕我提出了唯一一种可能的方式来知道,我认为不存在任何其他方式,但希望我错了;)。也不是我没有多次调用 myFun - 您可以调用其他函数,如 myFun2、myFun3 等来创建您需要的列。
    【解决方案4】:

    我选择创建一个函数来展平一列,然后与 udf 同时调用它。

    首先定义这个:

    implicit class DfOperations(df: DataFrame) {
    
      def flattenColumn(col: String) = {
        def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
          if (cols.isEmpty) df
          else addColumns(
            df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
            cols.tail
          )
        }
    
        val field = df.select(col).schema.fields(0)
        val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)
    
        addColumns(df, newCols).drop(col)
      }
    
      def withColumnMany(colName: String, col: Column) = {
        df.withColumn(colName, col).flattenColumn(colName)
      }
    
    }
    

    那么用法就很简单了:

    case class MyClass(a: Int, b: Int)
    
    val df = sc.parallelize(Seq(
      (0),
      (1)
    )).toDF("x")
    
    val f = udf((x: Int) => MyClass(x*2,x*3))
    
    df.withColumnMany("test", f($"x")).show()
    
    //  +---+------+------+
    //  |  x|test_a|test_b|
    //  +---+------+------+
    //  |  0|     0|     0|
    //  |  1|     2|     3|
    //  +---+------+------+
    

    【讨论】:

    • 您不必用ColumnMany 做所有事情。只需使用 select("select.*") 将其展平即可。
    【解决方案5】:

    这可以通过使用pivot函数轻松实现

    df4.groupBy("year").pivot("course").sum("earnings").collect() 
    

    【讨论】:

    • 我在任何答案或操作中都没有看到“年份”、“课程”或“收入”。您在这个非常简洁的答案(不是)中谈论的是什么数据框架?
    猜你喜欢
    • 1970-01-01
    • 2019-04-06
    • 2020-10-21
    • 2019-11-10
    • 1970-01-01
    • 2021-02-03
    • 2015-09-28
    • 1970-01-01
    相关资源
    最近更新 更多