【发布时间】:2022-01-03 20:42:03
【问题描述】:
上下文
我正在尝试使用 Spark/Scala 来有效地“编辑”多个 parquet 文件(可能超过 50k)。唯一需要进行的编辑是根据给定的一组行 ID 进行删除(即删除记录/行)。
parquet 文件作为分区 DataFrame 存储在 s3 中,其中示例分区如下所示:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
每个分区可以有超过 100 个 parquet 文件,每个文件的大小在 50mb 到 500mb 之间。
输入
我们得到了一个名为 filesToModify 的火花 Dataset[MyClass],它有 2 列:
-
s3path: String= s3 中需要编辑的 parquet 文件的完整 s3 路径 -
ids: Set[String]= 在位于s3path的 parquet 文件中需要删除的一组 ID(行)
示例输入数据集filesToModify:
| s3path | ids |
|---|---|
| s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet | Set("a", "b") |
| s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet | Set("b") |
预期行为
鉴于 filesToModify 我想利用 Spark 中的并行性,对每个 row 执行以下操作:
- 加载位于
row.s3path的 parquet 文件 - 过滤,以便我们排除
id在集合row.ids中的任何行 - 计算
row.ids中每个 id 的已删除/排除行数(可选) - 将过滤后的数据保存回同一
row.s3path以覆盖文件 - 返回删除的行数(可选)
我尝试过的
我尝试过使用filesToModify.map(row => deleteIDs(row.s3path, row.ids)),其中deleteIDs 看起来像这样:
def deleteIDs(s3path: String, ids: Set[String]): Int = {
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
}
但是,当在 map 操作中执行时,这会导致 NullPointerException。如果我在 map 块之外单独执行它,那么它可以工作,但我不明白为什么它不在其中(与惰性评估有关?)。
【问题讨论】:
标签: scala apache-spark amazon-s3 parquet