【问题标题】:Spark transformation (map) is not called even after calling the action (count)即使在调用动作(计数)之后也不会调用 Spark 转换(映射)
【发布时间】:2021-08-07 04:21:11
【问题描述】:

我在 DataFrame 上定义了一个 map 函数,当我调用操作(在本例中为 count())时,我没有看到 map 函数内部调用的函数调用被每一行调用。

这是我的代码

def copyFilesToArchive(recordDF:DataFrame,s3Util:S3Client):Unit ={
    if(s3Util !=null) {
      // Copy all the Object to new Path
      logger.info(".copyFilesToArchive() : Before Copying the Files to Archive and no.of RDD Partitions ={}",recordDF.rdd.partitions.length);
      recordDF.rdd.map(row => {
        var key = row.getAs("object_key")
        var bucketName = row.getAs("bucket_name")
        var targetBucketName = row.getAs("target_bucket_name")
        var targetKey = "archive/" + "/" + key
        var copyObjectRequest = new CopyObjectRequest(bucketName, key, targetBucketName,targetKey )
        logger.info(".copyFilesToArchive() : Copying the File from ["+key+"] to ["+targetKey+"]");
        s3Util.getS3Client.copyObject(copyObjectRequest)
      })
      logger.info(".copyFilesToArchive() : Copying the Files to Archive Folder. No.of Files to Copy ={}",recordDF.count());
    }
    else{
      logger.info(".copyFilesToArchive() : Skipping Moving the Files as S3 Util is null");
    }
  }

当我运行单元测试时,我没有看到复制文件的日志记录。

INFO ArchiveProcessor - .copyFilesToArchive() :在将文件复制到存档之前并且 RDD 分区数 =200 INFO ArchiveProcessor - .copyFilesToArchive() :将文件复制到存档文件夹。要复制的文件数 =3000000

当我使用 collect() 时,我怎么会得到 OOM 错误。

if I use collect() then i can see the logging output.

recordDF.collect().map(row => {
 ...
})

谢谢 萨提什

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    Spark 数据帧是不可变的,如果您进行任何转换,它不会改变原始数据帧变量。

    您在 recordDF 上调用操作方法 count(),但不在 recordDF 的转换版本上,即recordDF.rdd.map(//operations)。由于您没有调用任何特定代码块未执行的操作方法。

    因为 collect() 是一种操作方法,recordDF.collect().map(..)--> 这对你有用。 collect 方法会将所有记录带到驱动程序,如果内存不足(默认为 1 GB),则会出现 OOM 错误。

    您可以在数据帧上使用 foreach 或 foreachPartition 函数 -->recordDF.foreach(row ==> // transformation logic goes here) 或在 recordDF.map.rdd(row=> //...) 上调用操作

    val outRDD = recordDF.map.rdd(row=> //...)
    logger.info("--<your message>--", outRDD.count)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-04
      • 2013-06-18
      • 2020-06-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多