【发布时间】:2021-08-07 04:21:11
【问题描述】:
我在 DataFrame 上定义了一个 map 函数,当我调用操作(在本例中为 count())时,我没有看到 map 函数内部调用的函数调用被每一行调用。
这是我的代码
def copyFilesToArchive(recordDF:DataFrame,s3Util:S3Client):Unit ={
if(s3Util !=null) {
// Copy all the Object to new Path
logger.info(".copyFilesToArchive() : Before Copying the Files to Archive and no.of RDD Partitions ={}",recordDF.rdd.partitions.length);
recordDF.rdd.map(row => {
var key = row.getAs("object_key")
var bucketName = row.getAs("bucket_name")
var targetBucketName = row.getAs("target_bucket_name")
var targetKey = "archive/" + "/" + key
var copyObjectRequest = new CopyObjectRequest(bucketName, key, targetBucketName,targetKey )
logger.info(".copyFilesToArchive() : Copying the File from ["+key+"] to ["+targetKey+"]");
s3Util.getS3Client.copyObject(copyObjectRequest)
})
logger.info(".copyFilesToArchive() : Copying the Files to Archive Folder. No.of Files to Copy ={}",recordDF.count());
}
else{
logger.info(".copyFilesToArchive() : Skipping Moving the Files as S3 Util is null");
}
}
当我运行单元测试时,我没有看到复制文件的日志记录。
INFO ArchiveProcessor - .copyFilesToArchive() :在将文件复制到存档之前并且 RDD 分区数 =200 INFO ArchiveProcessor - .copyFilesToArchive() :将文件复制到存档文件夹。要复制的文件数 =3000000
当我使用 collect() 时,我怎么会得到 OOM 错误。
if I use collect() then i can see the logging output.
recordDF.collect().map(row => {
...
})
谢谢 萨提什
【问题讨论】:
标签: apache-spark