【问题标题】:What is the best possible way to call Hadoop FileSystem operations from Serializable Scala object从 Serializable Scala 对象调用 Hadoop FileSystem 操作的最佳方法是什么
【发布时间】:2019-10-28 06:12:04
【问题描述】:

/我正在尝试什么/

我想对几个包含 BZ2 文件的 HDFS 存储桶进行一些 Spark UDF 转换。我已经定义了一个 extends Serializable 的 MyMain Scala 对象,因为它涉及在每个 HDFS 存储桶上调用 UDF 转换。

但是,在进行 UDF 转换之前,我需要过滤实际上包含一些 BZ2 文件的 HDFS 存储桶。这需要我在 MyMain.main 方法中保留的 Hadoop FileSystem 操作,以便将这些计算限制在驱动程序内存中,而不是分发到工作节点,因为据我了解 FileSystem 不可序列化。

但是,即使我制作了一个单独的可序列化 HadoopUtils 类并制作了一个单例伴生对象并在 MyMain.main 中调用了所有 FileSystem 操作,我仍然得到 “任务不可序列化”异常(下)

/问题/

从可序列化对象(如 MyMain)调用不可序列化文件系统操作的方法是什么?此外,class HadoopUtils extends Serializable 似乎不是可序列化的,尽管如此定义?

/我的代码/

val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)

class HadoopUtils extends Serializable {

  def existsDir(fs: FileSystem, path: String) : Boolean = {
    val p = new Path(path)
    fs.exists(p) && fs.getFileStatus(p).isDirectory
  }
  def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
    val path = new Path(bucketBZDir)
    val fileStatus = fs.listStatus(path).filter(
      p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
    )
    !fileStatus.isEmpty
  }

  def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
    //Filter the list of buckets having at least one BZ2 file in it
    val range = (1 to 16).toList.map(x => x.toString)
    val buckets = prependtoList("Bucket",range)
    val allBuckets = prependtoList(lookupPath + "/", buckets)
    //From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
    val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
    val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
        path => { path + "/*.bz2" })
    BZ2BucketPaths
  }
}

object HadoopUtils {
  val getHadoopUtils = new HadoopUtils
}

object MyMain extends Serializable {
  val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
  val basePath = "/path/to/buckets"
  def main(args: Array[String]): Unit = {
    //NOTE: spark, hadoopfs defined in main so as to be processed in Driver
    val spark = SparkSession
      .builder()
      .appName("My_App")
      .enableHiveSupport()
      .getOrCreate()

    val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

    val BZ2Buckets = 
      HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)

    BZ2Buckets.foreach(path => {
      //Doing Spark UDF transformations on each bucket, which needs to be serialized
    })


  }
}

/异常的堆栈跟踪/

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:197)
  ... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
    - object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
    - field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
    - object (class $iw, $iw@3f4a0d43)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@74d06d1e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@f9764ea)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6821099e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4f509444)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11462802)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11d2d501)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@284fd700)
    - field (class: $line14.$read, name: $iw, type: class $iw)
    - object (class $line14.$read, $line14.$read@46b4206a)
    - field (class: $iw, name: $line14$read, type: class $line14.$read)
    - object (class $iw, $iw@33486894)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@25980fc9)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1fb0d28d)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42ea11d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42d28cc1)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@22131a73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@631878e1)
    - field (class: $line18.$read, name: $iw, type: class $iw)
    - object (class $line18.$read, $line18.$read@561c52c0)
    - field (class: $iw, name: $line18$read, type: class $line18.$read)
    - object (class $iw, $iw@1d5b8be2)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@4de4c672)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 85 more

【问题讨论】:

    标签: scala apache-spark hadoop serialization


    【解决方案1】:

    似乎Task not serializable 问题与HadoopUtils 类或对象无关。鉴于在驱动程序中,HadoopUtils类的实例通过singleton HadoopUtils objectHadoopUtils.getHadoopUtil访问,HadoopUtils类需要与MyMain对象一起序列化。

    问题的解决方法可以参考here

    【讨论】:

      猜你喜欢
      • 2015-01-30
      • 2010-09-15
      • 2012-04-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多