【问题标题】:Scala: How can I split up a dataframe by row number?Scala:如何按行号拆分数据框?
【发布时间】:2019-08-05 05:44:51
【问题描述】:

我想将 270 万行的数据帧拆分为 100000 行的小数据帧,所以最终得到 27 个数据帧,我也想将它们存储为 csv 文件。

我已经看过这个 partitionBy 和 groupBy 了,但是我不需要担心任何条件,除了它们必须按日期排序。我正在尝试编写自己的代码来完成这项工作,但如果您知道我可以使用的一些 Scala (Spark) 函数,那就太好了!

谢谢大家的建议!

【问题讨论】:

  • 有几个答案here 可以帮助您入门。
  • 谢谢,在我提出问题之前,我是这样开始的,但是这些解决方案对我并没有真正的帮助:)
  • @Eva,如果您的目标是破坏数据以保存较小的 csv 文件,您可以执行 df.repartition(27).write.csv("/path")。您将在“/path”文件夹下拥有 part000、part002、..part026 文件
  • 我也试试这个,看起来很简单的方法,如果它负责排序,那也应该足够了!只有当数据框增长时才会出现问题,然后每次都必须更改重新分区......但是为了快速获得结果,它看起来不错!谢谢:)
  • 重新分区可能代价高昂(它会移动所有数据)并且不会保留顺序。

标签: scala apache-spark split apache-spark-sql databricks


【解决方案1】:

您可以使用 RDD API 中的 zipWithIndex(不幸的是,在 SparkSQL 中没有等效项)将每一行映射到一个索引,范围在 0rdd.count - 1 之间。

因此,如果您有一个我假设要进行相应排序的数据框,则您需要在两个 API 之间来回切换,如下所示:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// creating mock data
val df = spark.range(100).withColumn("test", 'id % 10)

// zipping the data
val partitionSize = 5 // I use 5 but you can use 100000 in your case
val zipped_rdd = df.rdd
    .zipWithIndex.map{ case (row, id) => 
        Row.fromSeq(row.toSeq :+ id / partitionSize ) 
    }

//back to df
val newField = StructField("partition", LongType, false)
val zipped_df = spark
    .createDataFrame(zipped_rdd, df.schema.add(newField))

让我们看一下数据,我们有一个名为 partition 的新列,它对应于您想要拆分数据的方式。

zipped_df.show(15) // 5 rows by partition
+---+----+---------+
| id|test|partition|
+---+----+---------+
|  0|   0|        0|
|  1|   1|        0|
|  2|   2|        0|
|  3|   3|        0|
|  4|   4|        0|
|  5|   5|        1|
|  6|   6|        1|
|  7|   7|        1|
|  8|   8|        1|
|  9|   9|        1|
| 10|   0|        2|
| 11|   1|        2|
| 12|   2|        2|
| 13|   3|        2|
| 14|   4|        2|
+---+----+---------+

// using partitionBy to write the data
zipped_df.write
    .partitionBy("partition")
    .csv(".../testPart.csv")

【讨论】:

  • 我尝试在 Databricks 中对其进行测试,但此时我必须导入哪个确切的库? val zipped_df = spark .createDataFrame(rdd, df.schema.add(newField)) 为 rdd?
  • 我对数据块不是很熟悉。 SparkSession 是为您创建的吗?如果是,我认为您只需要导入org.apache.spark.sql.types._ 就可以创建StructFieldorg.apache.spark.sql.Row。让我更新我的答案:)
  • 这些导入没有解决 createDataFrame(rdd...) 部分的问题:error: not found: value rdd
  • 呵呵,只是变量名的问题。我改变了一个的名字,但没有改变另一个。对不起。我马上就解决了。
  • 是的,SparkSession 已创建!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-10-26
  • 1970-01-01
  • 2021-03-12
  • 1970-01-01
  • 2022-10-02
  • 1970-01-01
相关资源
最近更新 更多