【发布时间】:2022-06-11 01:23:55
【问题描述】:
我有一个包含数百万条记录的数据框,需要使用胶水作业将数据分区到小于 200MB 或 200,000 行的 s3 存储桶文件夹中。使用 partitionBy 将不起作用,因为没有列值以将分区保持在某些下游进程所需的大小以下的方式拆分数据。我尝试添加单调递增的 id 并根据预定义的 id 范围进行写入,但这不起作用,因为 monotonically_increasing_id 不是连续的。如何获得胶水作业将分区数据写入小于 200mb 的 s3 文件夹,或者有没有办法通过重新分区的数据帧进行分区
val newdf = diffDF.withColumn("id", monotonically_increasing_id())
var batchSize = 100000
var totalRecordCount = diffDF.count()
var currentRow = 0
while(currentRow < totalRecordCount){
var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
.drop("id")
segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
currentRow = currentRow + batchSize
}
【问题讨论】: