【问题标题】:Does Spark supports melt and dcast [duplicate]Spark是否支持melt和dcast [重复]
【发布时间】:2016-07-28 07:39:29
【问题描述】:

我们使用melt 和dcast 将数据从wide->long 和long->wide 格式转换。 详情请参考http://seananderson.ca/2013/10/19/reshape.html

scala 或 SparkR 都可以。

我经历过这个blogscala functionsR API。 我没有看到做类似工作的函数。

Spark 中有没有等价的功能?如果没有,在 Spark 中有没有其他方法可以做到这一点?

【问题讨论】:

  • 看起来不像。如果您可以将数据放入内存,请使用 as.data.frame() 将 Spark DataFrame 转换为原生 data.frame,对其进行整形,然后将其写回 Spark。
  • 因为没有。您需要自己编写。

标签: r scala apache-spark spark-dataframe melt


【解决方案1】:

Reshaping Data with Pivot in Spark 支持使用pivot 进行整形。我知道melt 大致与枢轴相反,也称为unpivot。我对Spark 比较陌生。据我所知,我试图实施熔化操作。

    def melt(df: DataFrame, columns: List[String]): DataFrame ={

    val restOfTheColumns =  df.columns.filterNot(columns.contains(_))
    val baseDF = df.select(columns.head, columns.tail: _*)
    val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true)))
    var newdf  = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure)

    for(variableCol <- restOfTheColumns){
      val colValues = df.select(variableCol).map(r=> r(0).toString)
      val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString)))
      var colDF =sqlContext.createDataFrame(colRdd, newStructure)
      newdf =newdf.unionAll(colDF)
    }
    newdf
  }

它完成了工作。但我不太确定效率。

+-----+---+---+----------+------+
| name|sex|age|    street|weight|
+-----+---+---+----------+------+
|Alice|  f| 34| somewhere|    70|
|  Bob|  m| 63|   nowhere|   -70|
|Alice|  f|612|nextstreet|    23|
|  Bob|  m|612|      moon|     8|
+-----+---+---+----------+------+

可以作为

melt(df, List("name", "sex"))

结果如下:

+-----+---+--------+----------+
| name|sex|variable|     value|
+-----+---+--------+----------+
|Alice|  f|     age|        34|
|  Bob|  m|     age|        63|
|Alice|  f|     age|       612|
|  Bob|  m|     age|       612|
|Alice|  f|  street| somewhere|
|  Bob|  m|  street|   nowhere|
|Alice|  f|  street|nextstreet|
|  Bob|  m|  street|      moon|
|Alice|  f|  weight|        70|
|  Bob|  m|  weight|       -70|
|Alice|  f|  weight|        23|
|  Bob|  m|  weight|         8|
+-----+---+--------+----------+

如果有改进的余地,我希望它有用并感谢您的 cmets。

【讨论】:

    【解决方案2】:

    这是一个 spark.ml.Transformer,它只使用数据集操作(没有 RDD 的东西)

    case class Melt(meltColumns: String*) extends Transformer{
    
      override def transform(in: Dataset[_]): DataFrame = {
        val nonMeltColumns =  in.columns.filterNot{ meltColumns.contains }
        val newDS = in
          .select(nonMeltColumns.head,meltColumns:_*)
          .withColumn("variable", functions.lit(nonMeltColumns.head))
          .withColumnRenamed(nonMeltColumns.head,"value")
    
        nonMeltColumns.tail
          .foldLeft(newDS){ case (acc,col) =>
            in
              .select(col,meltColumns:_*)
              .withColumn("variable", functions.lit(col))
              .withColumnRenamed(col,"value")
              .union(acc)
          }
          .select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*)
      }
    
      override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
    
      @DeveloperApi
      override def transformSchema(schema: StructType): StructType = ???
    
      override val uid: String = Identifiable.randomUID("Melt")
    }
    

    这是一个使用它的测试

    "spark" should "melt a dataset" in {
        import spark.implicits._
        val schema = StructType(
          List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++
          Range(3,10).map{ i => StructField("name_"+i,DoubleType)}.toList)
    
        val ds = Range(1,11)
          .map{ i => Row("a" :: "b" :: Range(3,10).map{ j => Math.random() }.toList :_ *)}
          .|>{ rows => spark.sparkContext.parallelize(rows) }
          .|>{ rdd => spark.createDataFrame(rdd,schema) }
    
        val newDF = ds.transform{ df =>
          Melt("Melt1","Melt2").transform(df) }
    
        assert(newDF.count() === 70)
      }
    

    .|> 是 scalaZ 管道运算符

    【讨论】:

      【解决方案3】:

      Spark DataFrame 具有 explode 方法,该方法提供 R melt 功能。 适用于 Spark 1.6.1 的示例:

      // input df has columns (anyDim, n1, n2)
      case class MNV(measureName: String, measureValue: Integer);
      val dfExploded = df.explode(col("n1"), col("n2")) {
        case Row(n1: Int, n2: Int) =>
        Array(MNV("n1", n1), MNV("n2", n2))
      }
      // dfExploded has columns (anyDim, n1, n2, measureName, measureValue)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-12-12
        • 2019-04-24
        • 2015-05-10
        • 1970-01-01
        • 2021-09-15
        • 1970-01-01
        相关资源
        最近更新 更多