【问题标题】:Partition a Spark Dataframe based on a specific column and dump the content of each partition on a csv根据特定列对 Spark Dataframe 进行分区,并将每个分区的内容转储到 csv 中
【发布时间】:2017-04-03 04:02:09
【问题描述】:

我正在使用 spark 1.6.2 Java API 在 Dataframe DF1 中加载一些数据,如下所示:

Key  Value
A    v1
A    v2
B    v3
A    v4

现在我需要根据“Key”列中的值子集对 DF1 进行分区,并将每个分区转储到一个 csv 文件(使用 spark-csv)。

期望的输出:

A.csv

Key Value
A   v1
A   v2
A   v4

B.csv

Key Value
B   v3

目前我正在做的是构建一个 HashMap (myList),其中包含我需要过滤的值的子集,然后在每次迭代时遍历该过滤不同的 Key。使用以下代码,我得到了我想要的,但我想知道是否有更有效的方法来做到这一点:

DF1 = <some operations>.cache();

for (Object filterKey: myList.keySet()) {
  DF2 = DF1.filter((String)myList.get(filterKey));

  DF2.write().format.format("com.databricks.spark.csv")
            .option("header", "true")
      .save("/" + filterKey + ".csv");
}

【问题讨论】:

    标签: java apache-spark spark-dataframe


    【解决方案1】:

    你就快到了,你只需要添加partitionBy,它将按照你想要的方式对文件进行分区。

    DF1
      .filter{case(key, value) => myList.contains(key))
      .write
      .partitionBy("key")
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save("/my/basepath/")
    

    文件现在将存储在“/my/basepath/key=A/”、“/my/basepath/key=B/”等下。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-09-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-02-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多