【问题标题】:Spark: Transpose DataFrame Without AggregatingSpark:在不聚合的情况下转置 DataFrame
【发布时间】:2021-12-10 09:27:24
【问题描述】:

我在网上查看了一些问题,但它们似乎没有达到我想要达到的效果。

我正在使用带有 Scala 的 Apache Spark 2.0.2。

我有一个数据框:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
|         1|  100|   0|   0|   0|   0|   0|
|         2|    0|  50|   0|   0|  20|   0|
|         3|    0|   0|   0|   0|   0|   0|
|         4|    0|   0|   0|   0|   0|   0|
+----------+-----+----+----+----+----+----+

我想转置为

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
|val2|    0|  50|   0|   0|
|val3|    0|   0|   0|   0|
|val4|    0|   0|   0|   0|
|val5|    0|  20|   0|   0|
|val6|    0|   0|   0|   0|
+----+-----+----+----+----+

我尝试过使用pivot(),但我无法找到正确的答案。我最终遍历了我的 val{x} 列,并按照下面的方法旋转了每个列,但事实证明这非常慢。

val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
|         1|  100|
|         2|    0|
|         3|    0|
|         4|    0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val1|  100|   0|   0|   0|
+----+-----+----+----+----+

然后在val{x} 的每次迭代中使用union() 到我的第一个数据帧。

+----+-----+----+----+----+
|vals|    1|   2|   3|   4|
+----+-----+----+----+----+
|val2|    0|  50|   0|   0|
+----+-----+----+----+----+

在我不想聚合数据的情况下,是否有更有效的转置方式?

谢谢:)

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    不幸的是,没有以下情况:

    • 考虑到数据量,Spark DataFrame 是合理的。
    • 数据转置是可行的。

    您必须记住,在 Spark 中实现的 DataFrame 是行的分布式集合,每一行都在单个节点上存储和处理。

    您可以将DataFrame 上的换位表示为pivot

    val kv = explode(array(df.columns.tail.map { 
      c => struct(lit(c).alias("k"), col(c).alias("v")) 
    }: _*))
    
    df
      .withColumn("kv", kv)
      .select($"segment_id", $"kv.k", $"kv.v")
      .groupBy($"k")
      .pivot("segment_id")
      .agg(first($"v"))
      .orderBy($"k")
      .withColumnRenamed("k", "vals")
    

    但它只是一个没有实际应用的玩具代码。在实践中,它并不比收集数据更好:

    val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
      case Array(h, t @ _*) => {
        (h.map(_.toString), t.map(_.collect { case x: Int => x }))
      }
    }
    
    val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
    val schema = StructType(
      StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
    )
    
    spark.createDataFrame(sc.parallelize(rows), schema)
    

    对于DataFrame 定义为:

    val df = Seq(
      (1, 100, 0, 0, 0, 0, 0),
      (2, 0, 50, 0, 0, 20, 0),
      (3, 0, 0, 0, 0, 0, 0),
      (4, 0, 0, 0, 0, 0, 0)
    ).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
    

    你会给你想要的结果吗:

    +----+---+---+---+---+
    |vals|  1|  2|  3|  4|
    +----+---+---+---+---+
    |val1|100|  0|  0|  0|
    |val2|  0| 50|  0|  0|
    |val3|  0|  0|  0|  0|
    |val4|  0|  0|  0|  0|
    |val5|  0| 20|  0|  0|
    |val6|  0|  0|  0|  0|
    +----+---+---+---+---+
    

    话虽如此,如果您需要对分布式数据结构进行有效的转置,您将不得不寻找其他地方。有许多结构,包括核心CoordinateMatrixBlockMatrix,它们可以跨两个维度分布数据并且可以转置。

    【讨论】:

    • 我是 Scala 和 Spark 的初学者,似乎结构和爆炸有错误。对吗?
    • @ShuaiLiu 你需要import org.apache.spark.sql.functions._
    【解决方案2】:

    在python中,这可以通过一种简单的方式完成 我通常通过转换 spark DataFrame 在 Pandas 中使用转置函数

    spark_df.toPandas().T

    【讨论】:

    • 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center
    • toPandas().T 实际上可能适用于小型 DataFrame,但对于较大的 DataFrame,您很可能会遇到内存不足错误
    【解决方案3】:

    这应该是一个完美的解决方案。

    val seq = Seq((1,100,0,0,0,0,0),(2,0,50,0,0,20,0),(3,0,0,0,0,0,0),(4,0,0,0,0,0,0))
    val df1 = seq.toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
    df1.show()
    
    val schema = df1.schema
    
    val df2 = df1.flatMap(row => {
      val metric = row.getInt(0)
      (1 until row.size).map(i => {
        (metric, schema(i).name, row.getInt(i))
      })
    })
    
    val df3 = df2.toDF("metric", "vals", "value")
    df3.show()
    import org.apache.spark.sql.functions._
    
    val df4 = df3.groupBy("vals").pivot("metric").agg(first("value"))
    df4.show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-06-14
      • 1970-01-01
      • 1970-01-01
      • 2019-12-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多